IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 理解Reactor后再来看netty的register、bind和监听accept流程 -> 正文阅读

[网络协议]理解Reactor后再来看netty的register、bind和监听accept流程

echo server 再解析

再看 echoserver 的代码流程:

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(5);

很明显,分别构建主从 Reactor组。主 Reactor 组内的 Reactor 数量为1,表示我们的 echoserver 目前只对外监听一个端口。

ServerBootstrap b = new ServerBootstrap();

简单看下 ServerBootstrap 代码,继承自 AbstractBootstrap:

// ServerBootstrap 配置 Reactor 组
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
        return this;
    }

其实很规律,子类ServerBootstrap维持child(即从 Reactor)的配置,主类AbstractBootstrap维持主 Reactor 的配置。

            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     p.addLast(serverHandler);
                 }
             });

上述代码对 ServerBootstrap 配置参数,方便接下来的启动,即:

ChannelFuture f = b.bind(PORT).sync();

很自然,我们进入 bind ,这里就到了 AbstractBootstrap 的 doBind 方法:

private ChannelFuture doBind(final SocketAddress localAddress)

doBind 流程详解

// doBind():
final ChannelFuture regFuture = initAndRegister();

很明显是异步执行,我们应该对 ChannelFuture 比较熟悉了。那么 initAndRegister 中执行为:

// initAndRegister
            channel = channelFactory.newChannel();
            init(channel);

这里也比较熟悉了,注意是创建的 NioServerSocketChannel(或者对应NIO中的 ServerSocketChannel),然后将要对其初始化:

// init():
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                // 这里的 handler 实际就是demo代码中配置的LoggingHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
               	// 注意下面这里,子 Reactor 中会使用。现在不会执行。
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

比较简单,主要是对 NioServerSocketChannel 的 pipeline 添加一个业务处理器。联系上一篇文章,很容易想到就是当客户端连接接入后,主 Reactor 会将调用这个pipeline中的执行单元。

但是要注意,这里调用 addLast 向 pipeline 中添加handler时,addLast 内部还有一个特殊逻辑,如下:

//addLast
            addLast0(newCtx);

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

如果还没有注册,就会在 pipeline 中构造一个 PendingHandlerAddedTask(我们后文会遇到),其中自然维持着pipeline 和 ChannelHandlerContext 的引用。再说明一下,pipeline 链表中的元素就是:ChannelHandlerContext

// initAndRegister
ChannelFuture regFuture = config().group().register(channel);

再回到 initAndRegister ,这里也是异步去执行 register 操作。所以我们跟着去看一下,注意这里的 config().group() 都是在 AbstractBootstrap 中执行,同时意味着group 是主 Reactor 的group。

next().register(channel);

next()是很正常的,因为 group 中可能存在多个 Reactor,那么我们选择哪个 Reactor 来执行就需要一个算法来决定。接着看:

// 这里的 this 即指 next() 找出来的 Reactor
NioServerSocketChannel.unsafe().register(this, promise);

上述代码改了点,只是说明意思:是继续使用 NioServerSocketChannel 对应的 unsafe 类执行注册操作,当然也是异步的。并且这里传入了 promise,这也就意味着内部的流程中会修改promise状态,以实现回调。unsafe类一听就和 java里的一样,是执行底层操作的。我们接着看:

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                }
            }

这里的 eventLoop 自然还是 主Reactor,当前线程是main,所以会走到 else方法体中。可见,这里直接将 register0 作为一个异步任务传给 eventLoop 执行,并且传入 promise(意味着会在内部设置状态,从而进行回调)。

值得注意的是,eventLoop.execute() 执行的内部,就会将 主Reactor(NioEventLoop) 启动起来,也就是启动了 Reactor 的核心循环。核心循环里面处理哪些东西,我们放到之后讲解。而且,经过之前文章的学习,这里应该有印象。核心循环中一个重要的步骤就是处理任务队列中的异步任务,也就是这里的 register0。

我们先认为 register0 还没有执行,因为异步的关系,先来回顾前面的 doBind() 流程:

    private ChannelFuture doBind(final SocketAddress localAddress) {
    	// 下面这行就是之前执行的
        final ChannelFuture regFuture = initAndRegister();

        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    promise.registered();
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
            return promise;
        }
    }

非常清晰,regFuture 还没执行完毕,所以进入 else代码块。代码块中给 regFuture 设置回调方法,意味着当上文异步的 register0 执行完毕执行回调后,会紧接着执行 doBind0 方法。

而现在的 doBind 方法中,会直接返回 PendingRegistrationPromise,main线程会持续等待,直到 bind流程执行完毕:

hannelFuture f = b.bind(PORT).sync();

那么我们接下来再回过头去看下 register0 和 doBind0 的流程。

register0流程

再回到 register0 的流程,这里只按照执行过程来说明,而不按照方法来划分。注意,这里已经是 主Reactor 的流程了,循环中处理异步task任务,也即这里的 register0。

// 1. 
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

javaChannel 表示 NioServerSocketChannel 对应的 javaNIO中的ServerSocketChannel,调用其 register 方法,将 selector(IO 多路复用器) 传入,设置监听事件为0,并且设置附带信息 attachment为this(即 NioServerSocketChannel)。

方法返回 SelectKey,可以看下包含哪些:而这些信息就是之后会从 SelectKey 中获取到的。

selectionKey = {SelectionKeyImpl@1847} 
 channel = {ServerSocketChannelImpl@1828} "sun.nio.ch.ServerSocketChannelImpl[unbound]"
 selector = {KQueueSelectorImpl@1850} 
 index = 0
 interestOps = 0
 readyOps = 0
 valid = true
 attachment = {NioServerSocketChannel@1440} "[id: 0x93bcb693]"

以上是我电脑环境中此时的 SelectKey,可以看到监听事件 interestOps 为0。

这里经过 Register后,已经将 NioServerSocketChannel attach在主Reactor中selector出来的selectionKey里了。

// 2. 
pipeline.invokeHandlerAddedIfNeeded();

该语句内部就会调用 init 中放入 pipeline 中的 ChannelInitializer 方法。这里就用到了上文提到的 PendingHandlerAddedTask 来关联这个待处理的 ChannelInitializer:

// init():
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                // 这里的 handler 实际就是demo代码中配置的LoggingHandler
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                // 注意下面这里,子 Reactor 中会使用。现在不会执行。
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

并且,会在调用完毕后,执行 remove将其移除掉:

            try {
                initChannel((C) ctx.channel());
                ...
            } 
            finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }

然后执行如下方法:

// 3.
safeSetSuccess(promise);

就会回调:

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

很明显还是提交异步任务到 主Reactor 的任务队列中。提交完成后,又回到 register0 的流程中:

safeSetSuccess(promise);
// 即回到该处继续执行
pipeline.fireChannelRegistered();

也就是依次调用 Pipeline 中的各 handler 去执行 channelRegistered 方法。

doBind0流程

那么 bind 的真正执行就是在 主Reactor 从任务队列中获取到 doBind 任务后触发,所以会执行:

            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }

这里的bind事件也是要依次执行 Pipeline 上的handler链,只不过这次是从 尾部tail开始,追踪调用过程,是:

  1. 调用 LogginHandler 的bind方法
  2. 调用 head handler 的bind方法,此时方法内执行 unsafe.bind()

而 unsafe 的 bind 就是调用NIO的bind过程。并在调用完成后设置 promise,main线程就会继续执行bind后的操作:

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            // 会继续执行以下操作
            System.out.println("bind is done");
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

也就是一直阻塞等待直到服务器socket关闭。

但是在 unsafe的bind 操作中,又提交了一个异步任务,如下:

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

也就是将 channelActive 事件在 pipeline 中传播,而当 Head Handler处理该事件时,又会在产生 read 事件,如下:

        public void channelActive(ChannelHandlerContext ctx) {
            ctx.fireChannelActive();

            // 调用后会产生 read 事件进行传播
            readIfIsAutoRead();
        }

而 read 事件是 outbound类型,所以会从 pipeline 的尾部开始传递。当再次传到 head时,会调用到 unsafe 的 read 方法,然后内部执行过程如下:

// 这里的代码是 NioServerSocketChannel内部执行
    protected void doBeginRead() throws Exception {
        // 这里的 selectionKey 就是 register0 中注册的
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

上述代码就是添加 OP_ACCEPT 事件到 selector,让其监听。

这样,服务端就已经绑定好了端口,并等待客户端接入了。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 18:20:11  更:2022-04-18 18:20:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 6:42:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码