echo server 再解析
再看 echoserver 的代码流程:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(5);
很明显,分别构建主从 Reactor组。主 Reactor 组内的 Reactor 数量为1,表示我们的 echoserver 目前只对外监听一个端口。
ServerBootstrap b = new ServerBootstrap();
简单看下 ServerBootstrap 代码,继承自 AbstractBootstrap:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
其实很规律,子类ServerBootstrap维持child(即从 Reactor)的配置,主类AbstractBootstrap维持主 Reactor 的配置。
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
上述代码对 ServerBootstrap 配置参数,方便接下来的启动,即:
ChannelFuture f = b.bind(PORT).sync();
很自然,我们进入 bind ,这里就到了 AbstractBootstrap 的 doBind 方法:
private ChannelFuture doBind(final SocketAddress localAddress)
doBind 流程详解
final ChannelFuture regFuture = initAndRegister();
很明显是异步执行,我们应该对 ChannelFuture 比较熟悉了。那么 initAndRegister 中执行为:
channel = channelFactory.newChannel();
init(channel);
这里也比较熟悉了,注意是创建的 NioServerSocketChannel(或者对应NIO中的 ServerSocketChannel),然后将要对其初始化:
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
比较简单,主要是对 NioServerSocketChannel 的 pipeline 添加一个业务处理器。联系上一篇文章,很容易想到就是当客户端连接接入后,主 Reactor 会将调用这个pipeline中的执行单元。
但是要注意,这里调用 addLast 向 pipeline 中添加handler时,addLast 内部还有一个特殊逻辑,如下:
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
如果还没有注册,就会在 pipeline 中构造一个 PendingHandlerAddedTask(我们后文会遇到),其中自然维持着pipeline 和 ChannelHandlerContext 的引用。再说明一下,pipeline 链表中的元素就是:ChannelHandlerContext 。
ChannelFuture regFuture = config().group().register(channel);
再回到 initAndRegister ,这里也是异步去执行 register 操作。所以我们跟着去看一下,注意这里的 config().group() 都是在 AbstractBootstrap 中执行,同时意味着group 是主 Reactor 的group。
next().register(channel);
next()是很正常的,因为 group 中可能存在多个 Reactor,那么我们选择哪个 Reactor 来执行就需要一个算法来决定。接着看:
NioServerSocketChannel.unsafe().register(this, promise);
上述代码改了点,只是说明意思:是继续使用 NioServerSocketChannel 对应的 unsafe 类执行注册操作,当然也是异步的。并且这里传入了 promise,这也就意味着内部的流程中会修改promise状态,以实现回调。unsafe类一听就和 java里的一样,是执行底层操作的。我们接着看:
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
这里的 eventLoop 自然还是 主Reactor,当前线程是main,所以会走到 else方法体中。可见,这里直接将 register0 作为一个异步任务传给 eventLoop 执行,并且传入 promise(意味着会在内部设置状态,从而进行回调)。
值得注意的是,eventLoop.execute() 执行的内部,就会将 主Reactor(NioEventLoop) 启动起来,也就是启动了 Reactor 的核心循环。核心循环里面处理哪些东西,我们放到之后讲解。而且,经过之前文章的学习,这里应该有印象。核心循环中一个重要的步骤就是处理任务队列中的异步任务,也就是这里的 register0。
我们先认为 register0 还没有执行,因为异步的关系,先来回顾前面的 doBind() 流程:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
非常清晰,regFuture 还没执行完毕,所以进入 else代码块。代码块中给 regFuture 设置回调方法,意味着当上文异步的 register0 执行完毕执行回调后,会紧接着执行 doBind0 方法。
而现在的 doBind 方法中,会直接返回 PendingRegistrationPromise,main线程会持续等待,直到 bind流程执行完毕:
hannelFuture f = b.bind(PORT).sync();
那么我们接下来再回过头去看下 register0 和 doBind0 的流程。
register0流程
再回到 register0 的流程,这里只按照执行过程来说明,而不按照方法来划分。注意,这里已经是 主Reactor 的流程了,循环中处理异步task任务,也即这里的 register0。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
javaChannel 表示 NioServerSocketChannel 对应的 javaNIO中的ServerSocketChannel,调用其 register 方法,将 selector(IO 多路复用器) 传入,设置监听事件为0,并且设置附带信息 attachment为this(即 NioServerSocketChannel)。
方法返回 SelectKey,可以看下包含哪些:而这些信息就是之后会从 SelectKey 中获取到的。
selectionKey = {SelectionKeyImpl@1847}
channel = {ServerSocketChannelImpl@1828} "sun.nio.ch.ServerSocketChannelImpl[unbound]"
selector = {KQueueSelectorImpl@1850}
index = 0
interestOps = 0
readyOps = 0
valid = true
attachment = {NioServerSocketChannel@1440} "[id: 0x93bcb693]"
以上是我电脑环境中此时的 SelectKey,可以看到监听事件 interestOps 为0。
这里经过 Register后,已经将 NioServerSocketChannel attach在主Reactor中selector出来的selectionKey里了。
pipeline.invokeHandlerAddedIfNeeded();
该语句内部就会调用 init 中放入 pipeline 中的 ChannelInitializer 方法。这里就用到了上文提到的 PendingHandlerAddedTask 来关联这个待处理的 ChannelInitializer:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
并且,会在调用完毕后,执行 remove将其移除掉:
try {
initChannel((C) ctx.channel());
...
}
finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
}
然后执行如下方法:
safeSetSuccess(promise);
就会回调:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
很明显还是提交异步任务到 主Reactor 的任务队列中。提交完成后,又回到 register0 的流程中:
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
也就是依次调用 Pipeline 中的各 handler 去执行 channelRegistered 方法。
doBind0流程
那么 bind 的真正执行就是在 主Reactor 从任务队列中获取到 doBind 任务后触发,所以会执行:
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
这里的bind事件也是要依次执行 Pipeline 上的handler链,只不过这次是从 尾部tail开始,追踪调用过程,是:
- 调用 LogginHandler 的bind方法
- 调用 head handler 的bind方法,此时方法内执行 unsafe.bind()
而 unsafe 的 bind 就是调用NIO的bind过程。并在调用完成后设置 promise,main线程就会继续执行bind后的操作:
ChannelFuture f = b.bind(PORT).sync();
System.out.println("bind is done");
f.channel().closeFuture().sync();
也就是一直阻塞等待直到服务器socket关闭。
但是在 unsafe的bind 操作中,又提交了一个异步任务,如下:
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
也就是将 channelActive 事件在 pipeline 中传播,而当 Head Handler处理该事件时,又会在产生 read 事件,如下:
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
而 read 事件是 outbound类型,所以会从 pipeline 的尾部开始传递。当再次传到 head时,会调用到 unsafe 的 read 方法,然后内部执行过程如下:
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
上述代码就是添加 OP_ACCEPT 事件到 selector,让其监听。
这样,服务端就已经绑定好了端口,并等待客户端接入了。
|