本文以mina 2.0.7为基础,记录mina nio网络接口的内部结构及消息流。
- public static void main(String[] args) {
- SocketAcceptor acceptor = new NioSocketAcceptor();
- acceptor.getFilterChain().addLast(
- "text", new Business0Filter();
- acceptor.setHandler(new ServerHandler());
- try {
- acceptor.bind(new InetSocketAddress(PORT));
- } catch (IOException e) {
- acceptor.dispose();
- }
- }
这是一个典型的基于mina的服务端程序。Mina的大体结构还是很清晰的,绑定一个端口接收消息,经过一个Filter链(上行)预处理request,最后在Handler完成业务逻辑,然后反向经过Filter链(下行)封装response,直至返回客户端。Mina支持TCP/UDP,本文只记录TCP(面向连接)的消息流。
看一下NioSocketAcceptor的内部结构概念图;
可以看出,整体上还是采用多Selector的架构。
Acceptor线程
在NioSocketAcceptor中有个Acceptor线程,它负责将一个ServerSocketChannel注册到主Selector中,注册事件为OP_ACCEPT,当有OP_ACCEPT响应时,取出相应的socket,封装在NioSocketSession中,并给这个NioSocketSession对象分配一个NioProcessor;
SimpleIoProcessorPool
NioSocketAcceptor维护的一个NioProcessor的对象池,这个对象池缺省状态下维护了Runtime.getRuntime().availableProcessors() + 1个NioProcessor对象,每个运行时建立的NioSocketSession对象都会分配一个NioProcessor对象,这个NioProcessor对象与NioSocketSession对象是一对多的关系;
NioProcessor
每个NioProcessor对象内部维护了一个Selector对象及Processor线程,除此还有一些数据结构,用于数据处理,主要的有;
- Queue<S> newSessions = new ConcurrentLinkedQueue<S>()
- Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>()
其中,newSessions就是当给NioSocketSession对象分配NioProcessor时,将此NioSocketSession对象添加到此newSessions queue中,同时将对应的channel以OP_READ注册到这个NioProcessor对象的Selector,给session分配NioProcessor的逻辑是;
- private IoProcessor<S> getProcessor(S session) {
- IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
- if (processor == null) {
- if (disposed || disposing) {
- throw new IllegalStateException("A disposed processor cannot be accessed.");
- }
- processor = pool[Math.abs((int) session.getId()) % pool.length];
- if (processor == null) {
- throw new IllegalStateException("A disposed processor cannot be accessed.");
- }
- session.setAttributeIfAbsent(PROCESSOR, processor);
- }
- return processor;
- }
这个session id是生成session时产生的。
NioProcessor对象的Processor线程就是从这个newSessions queue里取出新加进来的session对象,以OP_READ注册到自己Selector中,然后针对自己的Selector进行select()操作,当有可读的socket事件响应时就取出socket数据,然后走Filter链直至业务逻辑。当有了response需要返回给客户端时,会将response对象封装并添加到session对象的一个写出缓冲里,然后session对象被添加到NioProcessor对象的flushingSessions中,Processor线程会从flushingSessions中取出session,然后从session的写出缓冲中取出response,写socket。需要注意的是,在处理写的时候还是比较细腻的,进入flushingSessions queue有两种方式,
- 一个是经过业务逻辑处理,产生response,直接将session压入flushingSessions queue;
- 另一个是当执行上述写socket动作时,因为网络闪断或其他什么原因,写socket不能正常完成,这时,会将session对应的channel在Selector中注册OP_WRITE事件,当Processor线程发现这个session对应的channel有OP_WRITE事件时,会重新将这个session压入flushingSessions queue,然后就是走正常逻辑了,看下代码感受一下;
- private boolean flushNow(S session, long currentTime) {
- final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req = null;
- try {
- ......
- do {
- req = session.getCurrentWriteRequest();
- if (req == null) {
- // session的写出缓冲中取response
- req = writeRequestQueue.poll(session);
- if (req == null) {
- break;
- }
- session.setCurrentWriteRequest(req);
- }
- int localWrittenBytes = 0;
- Object message = req.getMessage();
- if (message instanceof IoBuffer) {
- //写socket
- localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes, currentTime);
- if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
- //写socket成功,但是buffer中还有需要写的数据
- writtenBytes += localWrittenBytes;
- setInterestedInWrite(session, true);
- return false;
- }
- }
- ......
- //写socket出错,重置channel事件
- if (localWrittenBytes == 0) {
- setInterestedInWrite(session, true);
- return false;
- }
- ......
- } while (writtenBytes < maxWrittenBytes);
- } catch (Exception e) {
- ......
- }
- return true;
- }
- protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
- SelectionKey key = session.getSelectionKey();
- if (key == null) {
- return;
- }
- int newInterestOps = key.interestOps();
- //注册OP_WRITE事件
- if (isInterested) {
- newInterestOps |= SelectionKey.OP_WRITE;
- } else {
- newInterestOps &= ~SelectionKey.OP_WRITE;
- }
- key.interestOps(newInterestOps);
- }
在各个Selector的维护及session的超时控制等方面,mina在实现的细节上还有很多特别的设计,
相关推荐
Mina PPT 英文高级 Mina 架构设计PPT版
基于mina架构的JTT808协议演示版本程序,网关采用mina+spring+MQ/redis架构,简单配置application.properties中的MQ:jms.brokerurl=tcp://127.0.0.1:61616 需要正版可联系QQ:78772895
基于Mina架构开发的可配置的Socket Server,后台采用MySql数据库,可以独立app部署或Tomcat Servlet部署,包括Socket Server全部实现代码,后台MySql数据备份文件,Java测试代码,android端测试代码和iOS端测试代码...
该版本基于mina架构的JTT808协议演示版本程序,网关采用mina+spring+MQ/redis架构,简单配置application.properties中的 MQ:jms.brokerurl=tcp://127.0.0.1:61616; 终端连接对应的端口:3005, 点击bin/目录下面...
基于mina架构的JTT808协议演示版本程序,网关采用mina+spring+MQ/redis架构,简单配置application.properties中的 MQ:jms.brokerurl=tcp://127.0.0.1:61616; 终端连接对应的端口:3005, 点击bin/目录下面的bin/...
基于mina架构的一个文件上传程序,当中包含了mina的编码、解码,以及java的nio技术
高性能网络架构Mina视频免费下载_百度云盘资源
基于MINA架构实现的UDP接收服务器,支持对mysql数据库的连接池插入查找操作。数据接收采用MINA,处理使用线程池并结合数据库连接池实现对发送数据的储存。
同时,微信小程序应用 MINA 架构, 其采用“响应—绑定”的运行机制,开发人员仅需重点关 注系统的视图层(View)、逻辑层(App Service),就能 够完成整体开发 [1]。其中视图层主要应用类似于 HTML 和 CSS 的 WXML...
基于Mina的网络通讯,分为服务端和客户端。...研究selector NIO实现时,发现了这个架构。 Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。
Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个... 客户端/服务端框架(典型的C/S架构) 网络套接字(networking socket)类库 事件驱动的异步API(注意:在JDK7中也新增了异步API)
mina初学教程。最近使用Mina开发一个Java的NIO服务端程序... 客户端/服务端框架(典型的C/S架构) 网络套接字(networking socket)类库 总之:我们简单理解它是一个封装底层IO操作,提供高级操作API的通讯框架!
一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件...Apache Mina也称为: NIO框架 客户端/服务端框架(典型的C/S架构) 网络套接字(networking socket)类库
MINA框架的特点有:基于java NIO类库开发;采用非阻塞方式的异步传输;事件驱动;支持批量数据传输;支持TCP、UDP协议;控制反转的设计模式(支持Spring);采用优雅的松耦合架构;可灵活的加载过滤器机制;单元测试...
NIO是一个基于事件的IO架构,最基本的思想就是:有事件我通知你,你再去做你的事情,没事件时你大可以节约大把时间去做其它任何事情。而且NIO的主线程only one,不像传统的模型,需要N个线程去,也减轻了JVM的工作量...
客户端/服务端框架(典型的C/S架构) 网络套接字(networking socket)类库 事件驱动的异步API(注意:在JDK7中也新增了异步API) 总之:我们简单理解它是一个封装底层IO操作,提供高级操作API的通讯框架!
主要Mina框架的架构和使用流程做了一个详细的简介,方便用户和读者使用和下载
mina是一种高效的互联网网络通信架构,此资料为入门基础。
本文将通过一个简单的问候程序 HelloServer 来介绍 MINA 的基础架构的同时演示如何使用 MINA 开发网络应用程序。
主要屏蔽了网络通信的一些细节,对Socket进行封装,并且是NIO的一个实现架构,可以帮助我们快速的开发网络通信,常用于游戏的开发、中间件服务端的程序中。 Apache的Mina(Multipurpose Infrastructure Networked ...