回顾一下Selector,多路复用器Selector可以管理多个Channel,可以向Selector注册感兴趣的事件,当事件就绪,通过Selector.selector方法获取注册的事件,进行相应的操作。

Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
	Set<SelectionKey> keys = selector.selectedKeys();
	Iterator<SelectionKey> iterator = keys.iterator();
	while (iterator.hasNext()) {
	   SelectionKey key = iterator.next();
	   iterator.remove();
	   if (key.isAcceptable()) {
	   ...
	   } else if (key.isReadable() && key.isValid()) {
	   ...
	   }
	   keys.remove(key);
	}
 }
1. 实例化分析

一般我们获取Selector通过Selector.open()获取,

public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

Selector内部是通过SelectorProvider类的Provider方法实现。

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

这里使用单例模式,保证只有一个SelectorProvider。provider由sun.nio.ch.DefaultSelectorProvider.create()创建。create方法内部通过系统的名称来创建创建SelectorProvider,在这里创建了sun.nio.ch.EPollSelectorProvider

 public static SelectorProvider create() {
        String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
        if (var0.equals("SunOS")) {
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        } else {
            return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
        }
    }

继续回到Selector的open()中,获取到SelecProvider实例之后,继续调用openSelector(),很自然进入EPollSelectorProvider的openSelector()方法:

public AbstractSelector openSelector() throws IOException {
    return new EPollSelectorProvider(this);
  }

进入EPollSelectorProvider的构造函数

 EPollSelectorImpl(SelectorProvider var1) throws IOException {
        super(var1);
        long var2 = IOUtil.makePipe(false);
        this.fd0 = (int)(var2 >>> 32);
        this.fd1 = (int)var2;
        this.pollWrapper = new EPollArrayWrapper();
        this.pollWrapper.initInterrupt(this.fd0, this.fd1);
        this.fdToKey = new HashMap();
    }

初始化的主要步骤如下:

  • 首先通过IOUtil.makePipe(false)返回了一个非堵塞的管道(pipe),底层是通过Linux的pipe系统调用实现的;返回两个int常量,分别指向管道的读、写文件描述符。读端在高32位,写端在低32位
  • 接着定义了EPollArrayWrapper变量,并调用initInterrup方法,将fd0注册到epoll;
  • 最后定义了一个map类型的变量fdToKey,将channel的文件描述符ID和SelectionKey建立映射关系,SelectionKey中保存了Channel Selector 感兴趣的事件。

EpollArrayWapper is what?
EpollArrayWapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用,EpollArrayWapper类中有几个比较重要的Navtive方法:

private native int epollCreate();
private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);
private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;

看起来似乎比较眼熟,好像和Epoll的三个方法有些雷同,没错这三个native方法正是对上述epoll系列系统调用的包装。

2. Resigter

下面分析serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)的底层逻辑

public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
	SelectionKey k = findKey(sel);
	if (k != null) {
		k.interestOps(ops);
		k.attach(att);
	}
	 if (k == null) { // New registration synchronized (keyLock) { 
		 if (!isOpen()) throw new ClosedChannelException();
		 k = ((AbstractSelector)sel).register(this, ops, att); addKey(k);
	 }
	  	} return k;
	 }
 }
  • 如果该channel和selector已经注册过,则直接添加事件和附件。
  • 否则通过selector实现注册过程。
    继续调用SelectorImp的register方法:
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
	SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
	 k.attach(attachment);
	 synchronized (publicKeys) {
		 implRegister(k);
	 }
	 k.interestOps(ops);
	 return k;
  }
  1. 以当前channel和selector为参数,初始化SelectionKeyImpl 对象selectionKeyImpl ,并添加附件attachment。

  2. implRegister(k)将channel注册到epoll中

  3. k.interestOps(int) 的内部调用了nioInterestOps:

public SelectionKey nioInterestOps(int var1) {
            this.channel.translateAndSetInterestOps(var1, this);
            this.interestOps = var1;
            return this;
    }
  • translateAndSetInterestOps方法会将注册的感兴趣的事件和其对应的文件描述存储到EPollArrayWrapper对象的eventsLow或eventsHigh中,这是给底层实现epoll_wait时使用的。以SeverSocketChannel的实现为例。
 public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) {
       int var3 = 0;
       if ((var1 & 16) != 0) {
           var3 |= Net.POLLIN;
       }
       var2.selector.putEventOps(var2, var3);
   }
   //EpollSelector中
    public void putEventOps(SelectionKeyImpl var1, int var2) {
         SelChImpl var3 = var1.channel;
         this.pollWrapper.setInterest(var3.getFDVal(), var2);
    }
  • 同时该操作还会将设置SelectionKey的interestOps字段,这是给我们程序员获取使用的。

EPollSelectorImpl.implRegister:

protected void implRegister(SelectionKeyImpl var1) {
     SelChImpl var2 = var1.channel;
     int var3 = Integer.valueOf(var2.getFDVal());
     this.fdToKey.put(var3, var1);
     this.pollWrapper.add(var3);
     this.keys.add(var1);
}
  • 将channel对应的fd(文件描述符)和对应的SelectionKeyImpl放到fdToKey映射表中。
  • 将channel对应的fd(文件描述符)添加到EPollArrayWrapper中,并强制初始化fd的事件为0 ( 强制初始更新事件为0,因为该事件可能存在于之前被取消过的注册中。)
  • 将selectionKey放入到keys集合中。

3. Select

Selector.select方法最终调用的是doSelect方法:

protected int doSelect(long paramLong) throws IOException{
	//处理待取消的SelectionKey(调用SelectionKey.cancel()方法取消)
	processDeregisterQueue();
	try {
		begin();
		this.pollWrapper.poll(paramLong);
	} finally {
		end();
	}
	processDeregisterQueue();
	int i = updateSelectedKeys();
	if (this.pollWrapper.interrupted())	{
		this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
		synchronized (this.interruptLock) {
			this.pollWrapper.clearInterrupted();
			IOUtil.drain(this.fd0);
			this.interruptTriggered = false;
		}
	}
	return i;
}

未完!

10-03 14:08