前言
先来看一段很普通的客户端程序,定义 NioEventLoopGroup,然后通过 Bootstrap 去连接Server。
public static void main(String[] args) throws Exception {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("在这里可以添加自定义的 Handler");
}
});
System.out.println("客户端已就绪,准备连接服务");
ChannelFuture future = bootstrap.connect("127.0.0.1", 7000).sync();
future.channel().closeFuture().sync();
}
这里跟Server程序有几个不同点:
-
客户端只需要1个 NioEventLoopGroup,服务端需要2个 NioEventLoopGroup -
客户端使用 Bootstrap 启动连接,服务端使用 ServerBootstrap 启动监听,它们的继承关系如下图: -
客户端调用 connect() 连接 Server,服务端调用 bind() 绑定监听IP+PORT
这边我们主要讲的就是 connect() 方法的内部实现。
PS.本篇文章Netty源码,只会贴出关键性的部分,方便理解。
原理解析
connect() 的核心就是 doResolveAndConnect():(不重要,pass)
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
validate();
return doResolveAndConnect(remoteAddress, localAddress);
}
doResolveAndConnect 关键的也就两个方法:initAndRegister、doResolveAndConnect0
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
return promise;
}
1、initAndRegister: 初始化 NioSocketChannel,并注册到 NioEventLoopGroup 内部的 NioEventLoop
该方法的实现在基类 AbstractBootstrap.java,因此和服务端的启动基本一样,唯一不同的在于 init() 方法。用一张图来表示吧,比较清楚:
2、doResolveAndConnect0: 主要是调用 doConnect() 实现连接 Server
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
这里的连接需要调试跟踪,最终到达 AbstractNioChannel.java的内部类 AbstractNioUnsafe,执行此处的 connect 方法,下面我贴出几句关键代码:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
}
}
继续跟踪 doConnect 方法,因为连接不是马上完成的,所以这边有个判断:如果连接尚未成功,则设置 interestOps == SelectionKey.OP_CONNECT
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
既然设置了interestOps,那么 selector.select() 那边肯定会捕获到事件。 直接查看 NioEventLoop.java 的 processSelectedKey 方法,里面有个对 CONNECT 事件进行处理:
- 1、将 SelectionKey.OP_CONNECT 移除掉,不需要再监听
- 2、调用 unsafe.finishConnect()
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
unsafe.finishConnect() 内部会调用 fulfillConnectPromise。 做了这么多,最最最关键的就是这里。此方法会触发 channelActive 事件,从而继续触发 AbstractNioChannel 的 doBeginRead 方法。看过 Netty原理一:ServerBootstrap启动过程全解析 这篇文章应该就记得,这里会重新设置 interestOps,设置的值就是 NioSocketChannel 构造函数设置的 SelectionKey.OP_READ
最后,连接成功之后,整个结构如下图所示
|