回顾一下Selector,多路复用器Selector可以管理多个Channel,可以向Selector注册感兴趣的事件,当事件就绪,通过Selector.selector方法获取注册的事件,进行相应的操作。
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
...
} else if (key.isReadable() && key.isValid()) {
...
}
keys.remove(key);
}
}
1. 实例化分析
一般我们获取Selector通过Selector.open()获取,
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
Selector内部是通过SelectorProvider类的Provider方法实现。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
这里使用单例模式,保证只有一个SelectorProvider。provider由sun.nio.ch.DefaultSelectorProvider.create()
创建。create方法内部通过系统的名称来创建创建SelectorProvider,在这里创建了sun.nio.ch.EPollSelectorProvider
public static SelectorProvider create() {
String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (var0.equals("SunOS")) {
return createProvider("sun.nio.ch.DevPollSelectorProvider");
} else {
return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
}
}
继续回到Selector的open()中,获取到SelecProvider实例之后,继续调用openSelector(),很自然进入EPollSelectorProvider的openSelector()方法:
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorProvider(this);
}
进入EPollSelectorProvider的构造函数
EPollSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
long var2 = IOUtil.makePipe(false);
this.fd0 = (int)(var2 >>> 32);
this.fd1 = (int)var2;
this.pollWrapper = new EPollArrayWrapper();
this.pollWrapper.initInterrupt(this.fd0, this.fd1);
this.fdToKey = new HashMap();
}
初始化的主要步骤如下:
- 首先通过IOUtil.makePipe(false)返回了一个非堵塞的管道(pipe),底层是通过Linux的pipe系统调用实现的;返回两个int常量,分别指向管道的读、写文件描述符。读端在高32位,写端在低32位
- 接着定义了EPollArrayWrapper变量,并调用initInterrup方法,将fd0注册到epoll;
- 最后定义了一个map类型的变量fdToKey,将channel的文件描述符ID和SelectionKey建立映射关系,SelectionKey中保存了Channel Selector 感兴趣的事件。
EpollArrayWapper is what?
EpollArrayWapper将Linux的epoll相关系统调用封装成了native方法供EpollSelectorImpl使用,EpollArrayWapper类中有几个比较重要的Navtive方法:
private native int epollCreate();
private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);
private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;
看起来似乎比较眼熟,好像和Epoll的三个方法有些雷同,没错这三个native方法正是对上述epoll系列系统调用的包装。
2. Resigter
下面分析serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)的底层逻辑
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) { // New registration synchronized (keyLock) {
if (!isOpen()) throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att); addKey(k);
}
} return k;
}
}
- 如果该channel和selector已经注册过,则直接添加事件和附件。
- 否则通过selector实现注册过程。
继续调用SelectorImp的register方法:
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
以当前channel和selector为参数,初始化SelectionKeyImpl 对象selectionKeyImpl ,并添加附件attachment。
implRegister(k)将channel注册到epoll中
k.interestOps(int) 的内部调用了nioInterestOps:
public SelectionKey nioInterestOps(int var1) {
this.channel.translateAndSetInterestOps(var1, this);
this.interestOps = var1;
return this;
}
- translateAndSetInterestOps方法会将注册的感兴趣的事件和其对应的文件描述存储到EPollArrayWrapper对象的eventsLow或eventsHigh中,这是给底层实现epoll_wait时使用的。以SeverSocketChannel的实现为例。
public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) {
int var3 = 0;
if ((var1 & 16) != 0) {
var3 |= Net.POLLIN;
}
var2.selector.putEventOps(var2, var3);
}
//EpollSelector中
public void putEventOps(SelectionKeyImpl var1, int var2) {
SelChImpl var3 = var1.channel;
this.pollWrapper.setInterest(var3.getFDVal(), var2);
}
- 同时该操作还会将设置SelectionKey的interestOps字段,这是给我们程序员获取使用的。
EPollSelectorImpl.implRegister:
protected void implRegister(SelectionKeyImpl var1) {
SelChImpl var2 = var1.channel;
int var3 = Integer.valueOf(var2.getFDVal());
this.fdToKey.put(var3, var1);
this.pollWrapper.add(var3);
this.keys.add(var1);
}
- 将channel对应的fd(文件描述符)和对应的SelectionKeyImpl放到fdToKey映射表中。
- 将channel对应的fd(文件描述符)添加到EPollArrayWrapper中,并强制初始化fd的事件为0 ( 强制初始更新事件为0,因为该事件可能存在于之前被取消过的注册中。)
- 将selectionKey放入到keys集合中。
3. Select
Selector.select方法最终调用的是doSelect方法:
protected int doSelect(long paramLong) throws IOException{
//处理待取消的SelectionKey(调用SelectionKey.cancel()方法取消)
processDeregisterQueue();
try {
begin();
this.pollWrapper.poll(paramLong);
} finally {
end();
}
processDeregisterQueue();
int i = updateSelectedKeys();
if (this.pollWrapper.interrupted()) {
this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
synchronized (this.interruptLock) {
this.pollWrapper.clearInterrupted();
IOUtil.drain(this.fd0);
this.interruptTriggered = false;
}
}
return i;
}