Netty核心源码
一、Netty 启动过程源码剖析
NioEventLoop 类 的run() 代码 ,无限循环 ,在服务器端运行
1、源码的基本理解
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
说明
-
先看启动类:main 方法中,首先创建了关于 SSL 的配置类。 -
重点分析下 创建了两个 EventLoopGroup 对象:
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
. (1) 这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。
(2) EventLoopGroup 是 事件循环组(线程组) 含有多个 EventLoop,可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)
(3) new NioEventLoopGroup(1); 这个 1 表示 bossGroup 事件组有 1 个线程你可以指定,如果 new NioEventLoopGroup() 会含有默认个线程 cpu 核数*2, 即可以充分的利用多核的优势
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));
会创建 EventExecutor 数组 children = new EventExecutor[nThreads]; //debug 一下 每个元素的类型就是 NIOEventLoop, NIOEventLoop 实现了 EventLoop 接口 和 Executor 接口 try 块中创建了一个 ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化( 看下源码 allows easy bootstrap of {@link ServerChannel} )。和 它和 ServerChannel , 关联, 而 而 ServerChannel 继承了Channel法 ,有一些方法 remoteAddress 等
随后,变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用【debug 下 group 方法
(4) 然后添加了一个 channel,其中参数一个 Class 对象,引导类将通过这个 Class 对象反射创建 ChannelFactory。然后添加了一些 TCP 的参数。[说明:Channel 的创建在 bind 方法,可以 Debug 下 bind ,会找到 channel = channelFactory.newChannel(); ]
(5) 再添加了一个服务器专属的日志处理器 handler。
(6) 再添加一个 SocketChannel(不是 ServerSocketChannel)的 handler。
(7) 然后绑定端口并阻塞至连接成功。
(8) 最后 main 线程阻塞等待关闭。
(9) finally 块中的代码将在服务器关闭时优雅关闭所有资源
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
说明:
这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过来的内容,然后打印,最后发送字符串给客户端。
2、EventLoopGroup 的过程
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
- 上面的
this(nThreads, (Executor) null); 调用构造器 ( 通过 alt+d 看即可)
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
- 上面的
this(nThreads, executor, SelectorProvider.provider()); 调用下面构造器
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
- 上面的
this ()... 调用构造器(alt+d)
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
- 上面的
super() .. 的方法是父类: MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
- 追踪到源码抽象类 MultithreadEventExecutorGroup 的构造器方法 MultithreadEventExecutorGroup 才是
NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了设计模式的模板模式 ,
3、MultithreadEventExecutorGroup
@param nThreads 使用的线程数,默认为 core *2 [可以追踪源码]
@param executor 执行器 : 如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器ThreadPerTaskExecutor
@param chooserFactory 单例 new DefaultEventExecutorChooserFactory() @param args args 在创建执行器的时候传入固定参数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
说明:
- 如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
- 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
- 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
- 根据线程选择工厂创建一个 线程选择器。
- 为每一个单例线程池添加一个关闭监听器。
- 将所有的单例线程池添加到一个 HashSet 中。
4、ServerBootstrap 创建和构造过程
- ServerBootstrap 是个空构造,但是有默认的成员变量
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
说明:
- 链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup属性
- channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
- option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。
- handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是SocketChannel
- childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用
5、绑定端口
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
-
doBind 源码剖析, 核心是两个方法 initAndRegister 和 doBind0 private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
cause an
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
6、initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(),
GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
说明:
- 基本说明: initAndRegister() 初始化 NioServerSocketChannel 通道并注册各个 handler,返回一个future
- 通过 ServerBootstrap 的通道工厂反射创建一个 NioServerSocketChannel。
- init 初始化这个 NioServerSocketChannel。
- config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 注册NioServerSocketChannel。
- 最后,返回这个异步执行的占位符即 regFuture。
init方法 会调用 addLast, 现在进入到 addLast 方法内查看
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
finalAbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
说明:
-
addLast 方法,在 DefaultChannelPipeline 类中 -
addLast 方法这就是 pipeline 方法的核心 -
检查该 handler 是否符合标准。 -
创 建 一 个 AbstractChannelHandlerContext 对 象 ,这 里 说 一 下 ,ChannelHandlerContext 对 象 是ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 Pipeline 中时,都会创建Context。
Context 的主要功能是管理他所关联的 Handler 和同一个 Pipeline 中的其他 Handler 之间的交互。
- 将 Context 添加到链表中。也就是追加到 tail 节点的前面。
- 最后,同步或者异步或者晚点异步的调用 callHandlerAdded0 方法
- 前面说了 dobind 方法有 2 个重要的步骤,initAndRegister 说完,接下来看 doBind0 方法, 代码如下
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
说明:
- 该方法的参数为 initAndRegister 的 future,NioServerSocketChannel,端口地址,NioServerSocketChannel 的promise
- 这里就可以根据前面下的断点,一直 debug:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
继续追踪 AbstractChannel 的
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
try {
fireChannelActive 的 方法,告诉所有的 handler ,已经成功绑定。
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
}
- 最终 doBind 就会追踪到 NioServerSocketChannel 的 doBind, 说明 Netty 底层使用的是 Nio
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
- 回到 bind 方法(alt+v)
- 最后一步:safeSetSuccess(promise),告诉 promise 任务成功了。其可以执行监听器的方法了。
- 继续 atl+V 服务器就回进入到(
NioEventLoop 类 )一个循环代码,进行监听
@Override
protected void run() {
for (;;) {
try {
}
7、Netty 启动过程梳理
- 创建 2 个 EventLoopGroup 线程池数组。数组默认大小 CPU*2,方便 chooser 选择线程池时提高性能
- BootStrap 将 boss 设置为 group 属性,将 worker 设置为 childer 属性
- 通过 bind 方法启动,内部重要方法为 initAndRegister 和 dobind 方法
- initAndRegister 方法会反射创建 NioServerSocketChannel 及其相关的 NIO 的对象, pipeline , unsafe,同时也为 pipeline 初始了 head 节点和 tail 节点。
- 在 register0 方法成功以后调用在 dobind 方法中调用 doBind0 方法,该方法会 调用 NioServerSocketChannel的 doBind 方法对 JDK 的 channel 和端口进行绑定,完成 Netty 服务器的所有启动,并开始监听连接事件
二、 Netty 接受请求过程源码剖析
1、源码剖析
2、源码分析过程
-
断点位置 NioEventLoop 的如下方法 processSelectedKey if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
-
执行浏览器 http://localhost:8007/ , 客户端发出请求 -
从的断点我们可以看到, readyOps 是 16 ,也就是 Accept 事件。说明浏览器的请求已经进来了 -
这个 unsafe 是 boss 线程中 NioServerSocketChannel 的 AbstractNioMessageChannel $ NioMessageUnsafe 对象。 -
进入到 AbstractNioMessageChannel $ NioMessageUnsafe 的 read 方法中 -
read 方法代码并分析
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
method
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
说明:
- 检查该 eventloop 线程是否是当前线程。assert eventLoop().inEventLoop()
- 执行 doReadMessages 方法,并传入一个 readBuf 变量,这个变量是一个 List,也就是容器。
- 循环容器,执行 pipeline.fireChannelRead(readBuf.get(i));
- doReadMessages 是读取 boss 线程中的 NioServerSocketChannel 接受到的请求。并把这些请求放进容器,一会我们 debug 下
doReadMessages 方法. - 循环遍历 容器中的所有请求,调用 pipeline 的 fireChannelRead 方法,用于处理这些接受的请求或者其他事件,在 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法, 开始执行 管道中的handler 的 ChannelRead 方法(debug 进入)
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
说明:
- 通过工具类,调用 NioServerSocketChannel 内部封装的 serverSocketChannel 的 accept 方法,这是 Nio 做法。
- 获取到一个 JDK 的 SocketChannel,然后,使用 NioSocketChannel 进行封装。最后添加到容器中
- 这样容器 buf 中就有了 NioSocketChannel
-
回到 read 方法,继续分析 循环执行 pipeline.fireChannelRead 方法 -
前面分析 doReadMessages 方法的作用是通过 ServerSocket 的 accept 方法获取到 Tcp 连接,然后封装成Netty 的 NioSocketChannel 对象。最后添加到 容器中 -
在 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法, 开始执行 管道中的 handler的 ChannelRead 方法(debug 进入) -
经过 dubug (多次),可以看到会反复执行多个 handler 的 ChannelRead ,我们知道,pipeline 里面又 4 个handler ,分别是 Head ,LoggingHandler ,ServerBootstrapAcceptor ,Tail。 -
重点看看 ServerBootstrapAcceptor 。debug 之后,断点会进入到 ServerBootstrapAcceptor 中来。我们来看看 ServerBootstrapAcceptor 的 channelRead 方法(要多次 debug 才可以)
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
说明:
1) msg 强转成 Channel ,实际上就是 NioSocketChannel 。
- 添加 NioSocketChannel 的 pipeline 的 handler ,就是我们 main 方法里面设置的 childHandler 方法里的。
3) 设置 NioSocketChannel 的各种属性。 4) 将该 NioSocketChannel 注册到 childGroup 中的一个 EventLoop 上,并添加一个监听器。 5) 这个 childGroup 就是我们 main 方法创建的数组 workerGroup。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
- 最终会调用
doBeginRead 方法,也就是 AbstractNioChannel 类的方法
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
- 这个地方调试时,请把前面的断点都去掉,然后启动服务器就会停止在 doBeginRead(需要先放过该断点,然后浏览器请求,才能看到效果)
- 执行到这里时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了
3、 Netty 接收请求过程梳理
总体流程:
接受连接----->创建一个新的 NioSocketChannel------>注册到一个 worker EventLoop 上---->注册 selecot Read 事件
- 服务器轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法,这个 unsafe 是 ServerSocket 的内部类,该方法内部由 2 部分组成
- doReadMessages 用于创建 NioSocketChannel 对象,该对象包装 JDK 的 Nio Channel 客户端。该方法会像创建 ServerSocketChanel 类似创建相关的 pipeline , unsafe,config
- 随后执行 执行 pipeline.fireChannelRead 方法,并将自己绑定到一个 chooser 选择器选择的 workerGroup 中的一个 EventLoop。并且注册一个 0,表示注册成功,但并没有注册读(1)事件
三、Pipeline Handler HandlerContext 创建源码剖析
1、源码剖析
明天继续!!!
|