serverBootstrap的启动
serverBootstrap 的启动从 bind() 方法进入
serverBootstrap.bind(port).sync();
然后一路跟进去,直到跟到 doBind() 方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
创建、初始化以及注册channel
final ChannelFuture regFuture = initAndRegister();
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
这里重点有三行代码,接下来逐行分析
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
channelFactory.newChannel()
这行代码意思是创建一个channel,跟进去看,会进入到 io.netty.channel.ReflectiveChannelFactory#newChannel 方法
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
从方法名不难看出,这是通过反射的方式来对 channel 进行创建,并且调用的是无参构造,因此我们直接找到 NioServerSocketChannel 直接从它的无参构造入手:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
======================================================================
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
进入父类构造:
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
...
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
简单来说,通过 NioServerSocketChannel 的无参构造器,先获取 nio 的原生 channel,并将这个原生 channel 复制到自己的 ch 成员变量上,将当前关注事件设为接受事件 SelectionKey.OP_ACCEPT, 然后通过父类构造,为其生成一个 id,unsafe底层操作对象, 以及 pipeline,并将自己设置为非阻塞状态。至此,channel 的创建结束。
init(channel)
channel 创建完后,接下来就是初始化这个 channel
@Override
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这里先是记录了所有设置的参数,如 option、attr 参数等,然后将它们设置到 channel 中,同时将 child 开头的参数也搜集起来,用于初始化 childChannel 属性
这里讲一下这行代码:
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
这里意思是当 bossEventLoop 创建好后,将一个 ServerBootstrapAcceptor 加入到 pipeline 中 这里先说结论,当 bossEventLoop 接收到请求后,会将请求交给 ServerBootstrapAcceptor 来处理,这就是 Netty 的 Reactor 模型
看下这个类:
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}
这里构造方法仅仅只是把所有相关参数保存下来而已。 当客户端的连接发送到服务端时,ServerBootstrapAcceptor 会将其进行处理,主要是调用了下面这个方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
至此,channel 的初始化工作完成。
config().group().register(channel)
接下来看这句代码
config().group().register(channel)
其中 config().group() 指的是 bossGroup,我们找到它的 register 方法
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
跟进 register0 方法:
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
继续 doRegister 方法:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
这里 javaChannel() 获取到的就是 java nio 原生的channel,将原生的 channel 注册到原生的 selector,由于此时还没有发生 bind,也就是说还有没有 Active,因此这里第一次注册的时候会注册0,表示不关注任何事件。
这个 this 指的是,当发生事件时,会对事件进行处理,此时会把这个 channel 拿出来去做处理
当 doRegister 完成后,会回到前面 doBind 方法里的 doBind0 方法,这里我们先不着急去看 doBind0 方法,后面再说。
eventLoop.excute()
前面我们通过 eventLoop.excute() 去执行 register0 方法,现在我们看看这个 excute 方法做了什么
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
这里是将任务添加到任务队列,然后启动线程(任务队列是在 eventLoop 初始化时进行创建的)。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
这里先通过 cas 的方式修改线程状态,修改成功后开始真正启动线程 doStartThread:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
}
});
}
这里的 execute 方法实际上是通过 ThreadExecutorMap 调用 eventLoop 里的 executor 的 execute 方法,并最终通过threadFactory.newThread(command).start(); 来进行启动。(不知道我这样理解对不对,要是不对可以评论指出来。详细可以看我上一篇文章 executor 初始化那部分)
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
这里eventLoop.excute() 线程的启动说完,然后我们看看上文提到一个无限循环的线程SingleThreadEventExecutor.this.run();
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
}
}
这段代码的逻辑主要是选择就绪的 channel,如果没有,则进行阻塞选择,阻塞选择后进入下面的逻辑对 channel 进行处理(就算进行阻塞选择也不一定有 channel 需要处理) 这段逻辑先到此结束先,不然要讲的实在是有点多,而且也不是重点,难度也不大,就不在这里写了。
绑定
doBind0
好了我们现在回到主线 doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
继续跟进channel.bind 方法:
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
这里由于 pipeline 有很多 Context,我们直接跳到具体实现 HeadContext 中
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
最终看到了 doBind 方法,再跟进去:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
最终我们发现,bind 调用的是 nio 原生的 channel 进行绑定
由于前面判断是否激活时时false,因此会走到下面的代码:
pipeline.fireChannelActive();
这里会走到 HeadContext 中的 channelActive 方法
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
跟进:
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
直接找到这个read方法
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
}
}
进入doBeginRead(); ,从这里开始注册读事件
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
由于前面注册的时候时先注册0,对任何事件不感兴趣,因此这里 interestOps 的值为0, 然后下面 selectionKey.interestOps(interestOps | readInterestOp); 则是真正的注册读事件。
至此 serverBootstrap 的启动分析完毕
总结
1.Channel 的创建是以反射的形式调用无参构造进行创建 2.初始化时,体现了 netty 的 reactor 模式 3.第一次注册时,是先注册0 4.eventLoop 的启动是在注册时进行启动的 5.最终监听是通过 bind 方法成功后的 fireChannelActive 来触发的
|