Netty 学习(八):新连接接入源码说明

作者: Grey

原文地址:

博客园:Netty 学习(八):新连接接入源码说明

CSDN:Netty 学习(八):新连接接入源码说明

新连接的接入分为3个过程

  1. 检测到有新连接。

  2. 将新连接注册到 worker 线程。

  3. 注册新连接的读事件。

检测新连接的代码在NioEventLoop中的processSelectedKey()方法中

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        ......
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ......
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        .....
    }

启动一个 Netty 服务端和 Netty 客户端,在unsafe.read()这一行打断点,可以得到这里的unsafe就是NioMessageUnsafe,进入NioMessageUnsaferead()方法,

这个方法主要做的事情就是:创建,设置并绑定NioSocketChannel

     private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() {
            ......
            do {
                        // 创建`NioSocketChannel`
                        int localRead = doReadMessages(readBuf);
                        ......
                    } while (continueReading(allocHandle));
          ......
                // 设置并绑定 NioSocketChannel
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

        ......
        }
    }

创建NioSocketChannel调用的是doReadMessages()方法,通过Debug,可以看到doReadMessage()来自于NioServerSocketChannel

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

可以看到此时调用的是 Java 底层的accept()方法,创建了一条 JDK 层面的Channel, Netty 将其封装成自定义的NioSocketChannel,并加入一个List

继续 Debug,进入 NioSocketChannel 的构造方法中,调用的是AbstractNioByteChannel的构造方法

    protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
        super(parent, ch, SelectionKey.OP_READ);
    }

这个方法类似在 NIO 编程中,注册 OP_READ 事件,表示 Channel 对读事件感兴趣。

接下来是设置并绑定NioSocketChannel,处理每个NioSocketChannel,通过 Debug 可以来到AbstractUnsaferegister0()方法

private void register0(ChannelPromise promise) {
                // 注册Selector
                doRegister();
                // 执行 handler
                pipeline.invokeHandlerAddedIfNeeded();

                // 传播 ChannelRegistered事件
                pipeline.fireChannelRegistered();
                
                // 注册读事件
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
}

这个方法主要完成的事情就是:

  1. NioSocketChannel注册到Selector

  2. 配置自定义的Handler

  3. 将连接注册事件传播下去,调用了每个HandlerchannelRegistered方法。

  4. 注册读事件。

完整代码见:hello-netty

本文所有图例见:processon: Netty学习笔记

更多内容见:Netty专栏

参考资料

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码

10-05 10:32