都有java nio的实现是通过reactor pattern 来实现的有说明。java nio作为一种跨平台IO操作。在不同平台上面封装了对应平台的IO模型。
在reactor pattern 作者中已经提及,通过reactor pattern 模式可以来实现跨平台操作。所以,java nio通过reactor pattern模式就是这样完成的。
java nio在window 平台下面是使用Select 模型。对于java nio源代码的分析,对于如果理解reactor pattern的设计模式意义不大。因为java nio实现中
需要对JNI的封装。如果要了解对于不同平台的封装,可以通过ZThead库来了解会有更大的意义。这样可以避免对JNI 的干扰。因为JNI涉及到脚本语言java和C/C++交互的知识。
How to Build a Scalable Multiplexed Server With NIO
Reactor Pattern Mapped to NIO
Handle
SelectionKey
Event
SelectionKey.OP_READ,etc
Demultiplexer
Selector
Dispatcher
Selector.select() + iterate Selector.selectedKeys()
Handler
An instance of Runnable or Callable
最简单例子:TestReactor.java
- public class TestReactor
- {
- public static void main(String[] args) throws Exception
- {
- //创建serversocketchannel通道.
- ServerSocketChannel serversocketchannel =ServerSocketChannel.open();
- //设置非阻塞,异步模式
- serversocketchannel.configureBlocking(false);
- //关联的serversocket
- ServerSocket serversocket = serversocketchannel.socket();
- SocketAddress endpoint =new InetSocketAddress("127.0.0.1",8888);
- //绑定指定的端口
- serversocket.bind(endpoint);
- //创建Selector。在Reactor Pattern模式中,相当于Demultiplexer 作用,用来多路复用器
- Selector sel = Selector.open();
- //在select中注册链接事件。
- //在reactor 模式中SelectionKey 相当于event事件。
- //在SectionKey中存在OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT 事件类型。此时与OP_ACCEPT 关联的Channel为ServerSocketChannel
- SelectionKey selKey = serversocketchannel.register(sel,SelectionKey.OP_ACCEPT);
- while(true)
- {
- //进行阻塞操作,等待事件的到来。返回值在select 模型中表示完成操作的数目
- int selCount = sel.select();
- if(selCount>0)
- {
- System.out.println("selCount=>>"+selCount);
- }
- //返回可以操作的键集合。在window select 模型中,返回可以操作的fd_set集合
- Set<SelectionKey> selKeySet = sel.selectedKeys();
- for(SelectionKey key:selKeySet)
- {
- //在SelectionKey中,存在链接可以接受事件,则调用accept()函数就不会存在阻塞现象。
- //select
- if(key.isAcceptable())
- {
- //获取与SelectionKey.OP_ACCEPT关联的通道。即ServerSocketChannel.
- ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
- //调用ServerSocketChannel 不会发生阻塞。获取到客户链接
- SocketChannel socketchannel = serverChannel.accept();
- //设置阻塞模式
- socketchannel.configureBlocking(false);
- //关联SocketChannel的读和写事件
- socketchannel.register(sel,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
- //同时可以在SelectionKey中关联其他对象。在Select 模式中,Selectionkey 相当于Completionkey参数
- }
- if(key.isWritable())
- {
- SocketChannel socketchannel = (SocketChannel)key.channel();
- ByteBuffer src =ByteBuffer.allocate(100);
- src.putInt(100);
- src.flip();
- socketchannel.write(src);
- //关联SocketChannel的读和写事件
- socketchannel.register(sel,SelectionKey.OP_READ);
- //同时可以在SelectionKey中关联其他对象。在Select 模式中,Selectionkey 相当于Completionkey参数
- }
- if(key.isReadable())
- {
- SocketChannel socketchannel = (SocketChannel)key.channel();
- InetSocketAddress remote = (InetSocketAddress)socketchannel.getRemoteAddress();
- String remotestring = remote.getHostString()+remote.getPort();
- //关联SocketChannel的读和写事件
- socketchannel.register(sel,SelectionKey.OP_WRITE);
- //同时可以在SelectionKey中关联其他对象。在Select 模式中,Selectionkey 相当于Completionkey参数
- }
- selKeySet.remove(key);
- }
- }
- }
- }