前言
想学习netty的话,需要两个知识做支撑,一个是io多路复用,一个是reactor反应堆; 下面两篇文章是我认为非常细致清楚的对上面两个知识点的讲解; io多路复用 reactor反应堆模型
netty使用的是主从reactor模型,主reactor负责处理连接事件,创建channel,并监听io事件,如果有io事件产生,会将channel注册到从reactor上,从reactor会挑选工作线程来进行处理;
源码解析
整个服务的启动过程表面看起来还是比较简单的; 首先是初始化boss工作组和worker工作组,其次通过引导类来根据你的服务的需要进行配置,最后绑定端口服务启动; netty的源码比较晦涩,内部的继承关系十分复杂,希望读着可以打开源码通过debug调试的方式跟着我的脚步一起走; debug启动的时候可能会报错,需要先用maven compile 编译下common包,里面有几个template模板,编译成功后在启动main方法;
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
事件循环组(EventLoopGroup)的初始化
上面的代码看完,相信大家已经有一定理解,我们先不必纠结内部的实现过程,这里我们只需要了解整个启动过程即可,其内部的实现,我会带大家逐步拆解; 首先拆解第一个问题,也就是工作组的初始化,上面代码中boss和worker的初始化过程; 可以注意掉,echoServer中用的是NioEventLoopGroup,这里大家不必纠结,不同服务类型和需求你要使用的工作组可能不同,但是他们的原理都大同小异,我们还是以NioEventLoop为例进行解析;
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
我们进入到NioEventLoopGroup中,下面是上面第一步中调用的构造器;我只需要沿着这个构造器一直追踪下去即可;
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
可以看到,相比上一步多了个selectorProvider,我们可以进去看一下
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
到了这里,loadProviderFromProperty和loadProviderAsService应该是让我们自己来提供provider,如果都没有的话会给我们一个nio的默认provider,即DefaultSelectorProvider.create();
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}
可以看到这里的provider是一个KQueueSelectorProvider,我们继续进入;
public class KQueueSelectorProvider extends SelectorProviderImpl {
public KQueueSelectorProvider() {
}
public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this);
}
}
注意,这里的openSelector我们后面会用到,其含义就是通过provider为们提供一个selector实例,来轮训io事件; 接下来我们回到刚才的内容,这里我在贴一下这部分代码;
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
我们继续深入;
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
可以看到又多了一个DefaultSelectStrategyFactory,大家可以进去看一下,代码不复杂,里面是一个单例模式的一个轮训策略工厂; 继续深入;
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
线程池的拒绝策略,没什么好说的,我们继续;
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
接下来我们来到了NioEventLoopGroup的父类MultithreadEventExecutorGroup,这对线程数做了一下处理,继续;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
DefaultEventExecutorChooserFactory是给工作组用来挑选处理线程的,先略过,后面会说; 到这里,我们迎来了初始化NioEventLoopGroup的关键地方;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
checkPositive(nThreads, "nThreads");
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
我们逐步拆解上面的代码,先看这里:
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
还记得我们前面在new NioEventLoopGroup的时候,executor传入的是null嘛,这里就对这个null做了处理;我们进入ThreadPerTaskExecutor看一下;
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
我们注意到,这里是维护了一个线程工厂,并且一旦有任务需要执行,也就是调用execute方法的时候,就会通过工厂创建一个线程,并立刻启动; 到这里,对于executor的处理就结束了,我们先不要纠结这里,继续往下走;
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
这里我们需要重点关注的有两个部分,一个是newChild方法,一个是newChooser; 我们先说newChild;
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
SelectorProvider selectorProvider = (SelectorProvider) args[0];
SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
EventLoopTaskQueueFactory taskQueueFactory = null;
EventLoopTaskQueueFactory tailTaskQueueFactory = null;
int argsLength = args.length;
if (argsLength > 3) {
taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
}
if (argsLength > 4) {
tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
}
return new NioEventLoop(this, executor, selectorProvider,
selectStrategyFactory.newSelectStrategy(),
rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
}
这里newChild是个模板方法,我们最初是new了一个NioEventLoopGroup,所以我们自然要到NioEventLoopGroup中来,我们可以看到,前面是一些属性的赋值,最后面返回了一个NioEventLoop,这个NioEventLoop就是NioEventLoopGroup中的主要成员了;我们进入看一下;
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
到这里,我们到了当初的那个openSelector方法,并通过selectorTuple获得到了selector并注入到了NioEventLoop中;但是值得注意的是NioEventLoop继承的父类!
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
}
我们在进入到SingleThreadEventExecutor就明白了,其实NioEventLoop本质上就是个单线程的线程池+selector; 到这里,NioEventLoopGroup中的child都创建好了,那么接下来就是chooser了,我们上面提到过的那个;
这里我再贴一下刚才说的chooser工厂; 我们可以进入到这个工厂里面看一下;
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
chooser = chooserFactory.newChooser(children);
结合这两句我们可以看到,针对children的长度的奇偶数,采用不同个两种挑选线程的策略;
到了这里,NioEventLoopGroup的初始化就基本ok了,后续只是一些监听器,不是本次讨论的重点;
我们其实现在已经有个清晰的结构了,一个NioEventLoopGroup是有多个NioEventLoop和一个chooser组成的,NioEventLoop本质是一个单线程的线程池,并且在内部维护了一个Selector;chooser则是为了选择哪个NioEventLoop来执行任务的选择器; 我们回到最初
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
现在我们可以理解reactor的主从模型是如何在netty中实现的了,boss的children长度是1,也就是有一个NioEventLoop,worker则是有多个;
引导类配置
截至现在,我们仅仅走完了工作组的初始化部分,下面开始引导类的加载;
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
首先,new了一个ServerBootStrap,我们先从它入手
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
public ServerBootstrap() { }
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
childAttrs.putAll(bootstrap.childAttrs);
}
}
注意,我们是直接new的空构造器,但是有个地方我们需要注意,就是上面的成员变量里面有个ServerBootStrapConfig,这个比较重要,这个config向构造器里传了我们当前的引导类,也就是说,这个config是能够持有我们当前引导类中的属性的,后面我们就会发现,这里不做详细解释,接下来开始属性注入;
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
group方法可以请读者自己看一下,没有太多值得说的,就是将boss和worker的工作组告知引导类,我们继续看channel方法;
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
这里可以看到,像channelFactory中注入了一个ReflectiveChannelFactory,通过语义我们可以知道这个通过反射来获取channel的工厂;另外,关于channelFactory()这个方法,我们进入最里层,就是将ReflectiveChannelFactory赋值给serverBootStrap中的channelFactory属性;继续,option也是简单的属性注入,这里不是重点,略过;来到handler,这里注入的LoggingHandler需要我们关注一下,它将引出一个pipeline调用链的概念;
public class LoggingHandler extends ChannelDuplexHandler {
}
我们看一下这个ChannelDuplexHandler;
public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {
}
可以看到,这个类既是一个inBound处理器,也是一个OutBound处理器,在netty中,连接创建之后,所有入站和出站的请求在通过channel的时候都会经过pipeline的调用链,这个调用链分为inBound和outBound两种,分别对应入站和出站时候需要执行的处理内容,并且这两中handler都是在同一个pipeline链路中的,那么我们就可以将ChannelDuplexHandler理解为,这个handler是一个出站和入站都要走的handler,那么这样一来我们也可以理解这个loggingHandler的意义了; 这部分的最后,是childrenHandler的注入,这个至关重要,childrenHandler是不能不注入的,否则会报错,读者可以自己试一下把childrenHandler去掉,原因两个,childrenHandler是给worker进行实际处理用的,不加显然不合理,另外,在boss中的线程启动后,会通过执行childrenHandler去启动worder中的线程,这个后面我们再说;我们先看childrenHandler的注入;
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
ChannelInitializer是一个channel的一个启动器,里面提供了worker工作组channel的pipeline调用链的初始化; 到了这里读者可能就不理解为什么这里要这么做,实际上,当boss通过轮询建立连接的时候会注册一个channel,但是具体的逻辑处理是交给worker来做的,所以当交给worker之前,需要让worker注册boss持有的channel,并将worker需要的handler加入到其中的pipeline中,这里的p.addLast读者可以自行看一下,后面会用到,如果没有看懂,我在后面也会给出讲解;我们可以继续往下看,后面我们自然就清楚了;
启动服务
接下来,我们要进入最重要的部分,启动服务;
ChannelFuture f = b.bind(PORT).sync();
就这么简单的一句话,我们先进入bind方法,这里是主要是我们要关注的重点;
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
这里通过端口创建了套接字scoket,我们继续;
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
显而易见,doBind,继续;
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
这里我们有两个方法需要关注,其他可以忽略: initAndRegister(); doBind0(); 我们先看initAndRegister;
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
首先是channelFactory来new了一个channel,还记得刚才提到的ReflectiveChannelFactory么,这里就用到了;这个channelFactory就是ReflectiveChannelFactory,我们可以看一下ReflectiveChannelFactory中对newChannel的实现;
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
'(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
}
}
显而易见,通过反射为我们创建了我们当初传给引导类的NioServerChannel,我这里再贴一下之前的解释,以防大家忘了;
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
我们需要关注一下这个channel的实例化过程,通过反射调用的是NioServerChannel的无参构造器;
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
继续深入;
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
config赋值,这里我们不深究,我们要关注的是super;
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
boolean inputShutdown;
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}
现在我们来到了AbstractNioMessageChannel,继续;
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
}
继续super;
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}
那么现在,到了关键地方了,newChannelPipeline(),到了这里我们找到了关键点,在创建channel的同时会为channel创建自己的pipeline;感兴趣可以进去看看,里面是个带有head和tail节点(头尾节点)的链表;至此,我们获得了channel的实例了,接下来我们回到initAndRegister中; 另外,这里的newUnsafe后面会用到,读者可以先留个印象;
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
我们看init方法;
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这里获取到了当前NioServerChannel的pipeline,同时为其添加一个handler也就是ChannelInitializer,其实ChannelInitializer也是一个InBoundHandler,看一下他的父类就知道了; 接下来我们看一下ChannelInitializer要做什么,ChannelInitializer获取了当前的pipeline,并向里面又加入了一个config的handler,并调用当前工作组的线程去添加一个acceptor,想知道这两部都做了什么,我们继续往下看; 先来看一下config.handler,还记得我们前面说到,在new ServerBootStrap时被初始化的config么;
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
public ServerBootstrap() { }
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
childAttrs.putAll(bootstrap.childAttrs);
}
}
我们看这句话
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
config持有的是我们当前的serverBootStrap,那么对应的handler就是我们在最开始注入的handler,也就是loggingHandler;
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(serverHandler);
}
});
那么,当pipeline中的ChannelInitializer被执行的时候,loggingHandler就会被注入到pipeline中,acceptor也同理,接下来我们看一下acceptor是什么;
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
}
目前我们似乎看不出这个类具体是怎么用的,那么我们先往后看;
截至现在为止,channel的实例化和初始化都完成了;注意,我们现在还在initAndRegister中,接下来我们要看下一个方法也就是注册channel;
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
重点来了;
ChannelFuture regFuture = config().group().register(channel);
config不必多说,还是那个在serverBootStrap实例化的时候初始化的config,即serverBootStrapConfig; 这里的group方法是它父类的方法;
public final EventLoopGroup group() {
return bootstrap.group();
}
还记的我们当时在调用serverBootStrap方法的时候,调用的b.group(boss, worker)方法么,这个bossLoopGroup就是放到serverBootStrapConfig的父类中的,那么此时group()获取的也就是boss工作组; 接下来调用boss工作组的register方法; boss工作组中是NioEventLoopGroup,还记得前面我们说NioEventLoopGroup的父类是MultithreadEventLoopGroup么;
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
}
这个register方法就是他父类的方法;
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
我们继续看next;
public EventLoop next() {
return (EventLoop) super.next();
}
继续;
public EventExecutor next() {
return chooser.next();
}
ok,到了这里我们基本就明白了,之前我们注册的那个chooser在这里开始发挥作用了,通过next方法从当前NioEventLoopGroup的children数组中挑一个NioEventLoop来注册这个channel;也就是boss工作组中唯一的那个NioEventLoop;而NioEventLoop的父类是SingleThreadEventLoop;那么此时这个register就是SingleThreadEventLoop中的register方法;
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
这里new了一个promise,把NioServerSocketChannel和当前的SingleThreadEventLoop注入进去了,也就是说接下来这个promise将持有前面说的这个两个对象;
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
进入到register中,我们发现promise获取到了前面在初始化时注入的NioServerSocketChannel,接下来调用了一个unsafe(),还记得前面我们在实例化channel的时候提到的unsafe么;
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}
我们回到这个类去看一下这个unsafe,这个newUnsafe是个模板方法,我们当前使用的是NioServerSocketChannel,所以我们去NioServerSocketChannel中找这个方法,但是NioServerSocketChannel中找不到,我们去他的父类里找;
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
我们会在父类AbstractNioMessageChannel中找到这个方法,NioMessageUnsafe是AbstractNioMessageChannel的一个静态内部类;我们暂时不看这个类的内部方法,先回到刚才的地方;
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
接下来是register,也就是NioMessageUnsafe的register方法;最终我们会在它的父类AbstractUnsafe中找到register方法;
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
相信看到现在,读者应该也发现了,xxx0的方法往往就是netty中重要的逻辑,我们直奔主题;
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
这里有两个我们需要重点关注的方法,一个是doRegister,一个是invokeHandlerAddedIfNeeded; 先看一下doRegister;
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
javaChannel,其实就是我们一开始注入的NioServerSocketChannel,不理解的话可以进去看看,这里就不详细追踪了,接下来register方法返回了一个selectionKey,register的内容大致是将channel注册到了这个selector上,可以理解为,接下来这个selector就可以去监听这个channel了;
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
接下来就是invokeHandlerAddedIfNeeded方法了,这个方法决定了当前的boss工作组和后面的worker工作组是如何启动的;
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
我们看callHandlerAddedForAllHandlers方法;
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
好,到了这里,最关键的步骤就来临了,这部分的篇幅会比较长,读者可以重新捋一下前面说的再来看; 首先,我们看到了PendingHandlerCallback,还记得我说的那个p.addLast么,如果你当时看了里面的实现,应该就不会陌生了;
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
我们现在可以看看这个addLast了; 经过追踪(追踪比较简单,就不详细说了),我们会来到这里;
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
我们重点关注的地方在这里;
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
我们可以看看这个register在什么时候变成true的,这个很容易追踪,最后发现,register变成true的时候是在我们刚才说道的这里;
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
那么这样的话,这个方法的调用时机,显然是在addLast的后面的,也就是说,addLast在那个时候register一定是false,也就一定会走到下面的代码块里的callHandlerCallbackLater方法;
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
那么这里,大家应该就明白了,ctx也就是我们的handler,他被封装成了PendingHandlerCallback,并且是PendingHandlerAddedTask;那么我们就可以回到这个方法中了;
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
显而易见,重点肯定是task.execute();我们可以很轻松的追踪PendingHandlerAddedTask的execute方法;
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
atomicRemoveFromHandlerList(ctx);
ctx.setRemoved();
}
}
}
还记得我们的Executor是哪个么,NioEventLoop的父类,SingleThreadEventLoop;最终我们会在SingleThreadEventLoop的父类SingleThreadEventExecutor中找到这个方法的具体实现;
public void execute(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
我们继续进入execute;
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
首先addTask,进去就可以看到是在NioEventLoop的父类中维护的队列中添加任务(队列在new NioEventLoop的初始化过程中会把队列初始化); 接下来就是重点了,startThread;这里我做一个简单的讲解,以便读者能有个大概的认识;这个方法中,将boss工作组的线程启动,并去执行掉boss工作组中注册的channel中pipeline装载的调用链; 我们继续看;
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
除去一些对线程状态的判断,doStartThread显然是我们要关注的重点;
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
for (;;) {
if (confirmShutdown()) {
break;
}
}
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
confirmShutdown();
} finally {
try {
cleanup();
} finally {
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
可以看到,SingleThreadEventExecutor开始执行任务了,这里面我们需要关注的其实只有
SingleThreadEventExecutor.this.run();
this.run;我们不难知道,run是NioEventLoop的方法,所以我们进入NioEventLoop的run方法中;
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
default:
}
} catch (IOException e) {
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
}
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0);
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) {
selectCnt = 0;
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
这是个死循环,大致的工作就是selector去轮询,通过strategy判断是否轮询到io事件,如果轮询到了,通过processSelectedKeys进行处理(这个方法我们要注意一下,后面会用到,而且很关键),再对当前NioEventLoop下的task中的任务进行处理,也就是我们的handler,即runAllTask;runAllTask读者可以自己看,内容很简单;到现在我们还没有串联起完整的启动过程,我们先暂停回顾一下;
首先,boss工作组,通过注册channel并向pipeline中装填handler; 接下来,向selector中注册channel,这样select就可以监听channel中的事件了; 然后,boss工作组,通过调用invokeHandlerAddedIfNeeded,执行了之前装填在pipeline中的handler; 那么,之前在boss工作组的pipeline中装填的handler大家还记得么;
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
第一个handler是loggingHandler,我们看第二个,也就是acceptor处理器,这个处理器至关重要,他关系到后面的worker工作组的线程启动,这就是我在文章前面说,childrenHandler必须要有;到现在,整个boss工作组的服务都启动了,内部的线程也开始运行了,但是worker的线程还没有启动;接下来就是我刚才说的processSelectedKeys的作用了;我们回到processSelectedKeys中;
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
作者直接一点,这里最关键的是unsafe.read(),整个方法导致了worker的线程启动;这里是个unSafe,还记得之前说的unSafe是谁么;
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
没错,到这里,我们可以看到,fireChannelRead这个方法;经过追踪我们来到这里;
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
那么接下来就显而易见了,next.invokeChannelRead(m);
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
到了这里,大家应该就知道acceptor内部的方法是如何被调用的了;我们回到acceptor内部;
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
到了这里,是不是发现和boss的逻辑基本一致了;childGroup调用了register,其实和
ChannelFuture regFuture = config().group().register(channel);
就是一个东西了,那么接下来按照boss 的方式,worker的内部线程也就都启动了,这也就是为什么childHandler一定要有了,如果没有,在childGroup内部注册channel之后调用的handler是空的,那么线程就无法启动了;
好的,至此,服务启动的过程就完成了;
|