netty是最近项目要用到的nio框架,找了各种资料,发现称赞它的有点多,所以决定用它:其实也就二选一嘛,mina或netty或自己写。对于mina,也不熟,不过看各种介绍,貌似netty干活还是很不错的,尤其是最新的4.x和5.x重构后,且使用结构清晰就先了解了解了。


首先要把应用跑起来啦(官网的例子比较多),我这是一个关于mqtt的一个例子:

 m_bossGroup = new NioEventLoopGroup();
m_workerGroup = new NioEventLoopGroup(); final NettyMQTTHandler handler = new NettyMQTTHandler();
handler.setMessaging(messaging); ServerBootstrap b = new ServerBootstrap();
b.group(m_bossGroup, m_workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector));
pipeline.addFirst("idleStateHandler", new IdleStateHandler(, , Constants.DEFAULT_CONNECT_TIMEOUT));
pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler());
//pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
pipeline.addLast("decoder", new MQTTDecoder());
pipeline.addLast("encoder", new MQTTEncoder());
pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
pipeline.addLast("handler", handler);
}
})
.option(ChannelOption.SO_BACKLOG, )
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);
try {
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(Constants.PORT);
LOG.info("Server binded");
f.sync();
} catch (InterruptedException ex) {
LOG.error(null, ex);
}

再回想下,我们自己写serversocket的时候是怎么写的呢(这是一个笨拙的实例代码):

    ServerSocket socket;
channel = ServerSocketChannel.open(); // 打开通道
socket = channel.socket(); //得到与通到相关的socket对象
socket.bind(new InetSocketAddress(port)); //将scoket榜定在制定的端口上
//配置通到使用非阻塞模式,在非阻塞模式下,可以编写多道程序同时避免使用复杂的多线程
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_ACCEPT);
try {
while (true) {
this.selector.select();
Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
this.handleKey(key); }
}
} catch (IOException ex) {
ex.printStackTrace();
}

原理还是那些,channel.open(),然后register key,然后遍历,再然后才进行handleKey()的干活。

那netty的写法为什么那么潇洒呢,怀着这个莫名的疑问,我先不管它的结构什么的,直接进行search,发现了这么个东东:

   NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}

其中第8行从名称上来看,有点点意思了,往下看:

  private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();

其中的provider就是我们熟悉的:java.nio.channels.spi.SelectorProvider类。

所以这个就是做了selector.open的工作。

接下来能看到NioEventLoop:

     protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();

再继续看,该类中处理的selectedKey:

         final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
} try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}

现在明白了吧,其实netty也是走这么一套逻辑。

然后再网上看,逻辑是这样:

NioEventLoopGroup extends MultithreadEventExecutorGroup,其初始化了n个单线程的线程池(children = new SingleThreadEventExecutor[nThreads];)

每个单线程的对象child[i]=NioEventLoop对象,每个NioEventLoop有一个Selector字段。

其run方法是该group都需要干活的具体业务逻辑代码。

后续再加上别的类说明。

05-11 22:05