IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> JavaScript知识库 -> Netty源码学习笔记之serverBootstrap的启动分析 -> 正文阅读

[JavaScript知识库]Netty源码学习笔记之serverBootstrap的启动分析

serverBootstrap的启动

serverBootstrap 的启动从 bind() 方法进入

serverBootstrap.bind(port).sync();

然后一路跟进去,直到跟到 doBind() 方法

private ChannelFuture doBind(final SocketAddress localAddress) {
	//创建、初始化以及将该channel注册到selector上,异步操作
    final ChannelFuture regFuture = initAndRegister();
    //从异步结果中获取channel
    final Channel channel = regFuture.channel();
    //出现异常直接返回
    if (regFuture.cause() != null) {
        return regFuture;
    }
	
	//由于是异步操作,此时不能保证channel的操作完成
    if (regFuture.isDone()) {
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        //为异步结果添加监听器,当完成后会触发下面的逻辑
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    //绑定指定端口
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

创建、初始化以及注册channel

final ChannelFuture regFuture = initAndRegister();
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ...
        }

        ChannelFuture regFuture = config().group().register(channel);
        
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

这里重点有三行代码,接下来逐行分析

channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);

channelFactory.newChannel()

这行代码意思是创建一个channel,跟进去看,会进入到 io.netty.channel.ReflectiveChannelFactory#newChannel 方法

@Override
public T newChannel() {
    try {
        // 调用无参构造器创建channel
        return constructor.newInstance();
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
    }
}

从方法名不难看出,这是通过反射的方式来对 channel 进行创建,并且调用的是无参构造,因此我们直接找到 NioServerSocketChannel 直接从它的无参构造入手:

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
======================================================================
private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
        	//返回了一个Java Nio的原生channel
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

进入父类构造:

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
    	//设置为非阻塞
        ch.configureBlocking(false);
    } catch (IOException e) {
        ...
    }
}
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

简单来说,通过 NioServerSocketChannel 的无参构造器,先获取 nio 的原生 channel,并将这个原生 channel 复制到自己的 ch 成员变量上,将当前关注事件设为接受事件 SelectionKey.OP_ACCEPT, 然后通过父类构造,为其生成一个 idunsafe底层操作对象, 以及 pipeline,并将自己设置为非阻塞状态。至此,channel 的创建结束。

init(channel)

channel 创建完后,接下来就是初始化这个 channel

@Override
void init(Channel channel) {
	//把代码中的option和attr的参数搜集起来,设置到channel中
    setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
    setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

    ChannelPipeline p = channel.pipeline();
	
	//把child开头的参数搜集起来
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions =
            childOptions.entrySet().toArray(newOptionArray(0));
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

    p.addLast(new ChannelInitializer<Channel>() {
    	//这个方法要eventLoop启动之后才会执行
        @Override
        public void initChannel(final Channel ch) {
        	//这里的ch是上文创建好的channel
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

这里先是记录了所有设置的参数,如 optionattr 参数等,然后将它们设置到 channel 中,同时将 child 开头的参数也搜集起来,用于初始化 childChannel 属性

这里讲一下这行代码:

pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

这里意思是当 bossEventLoop 创建好后,将一个 ServerBootstrapAcceptor 加入到 pipeline
这里先说结论,当 bossEventLoop 接收到请求后,会将请求交给 ServerBootstrapAcceptor 来处理,这就是 NettyReactor 模型

看下这个类:

ServerBootstrapAcceptor(
    final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
    Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
    this.childGroup = childGroup;
    this.childHandler = childHandler;
    this.childOptions = childOptions;
    this.childAttrs = childAttrs;

    // See https://github.com/netty/netty/issues/1328
    enableAutoReadTask = new Runnable() {
        @Override
        public void run() {
            channel.config().setAutoRead(true);
        }
    };
}

这里构造方法仅仅只是把所有相关参数保存下来而已。
当客户端的连接发送到服务端时,ServerBootstrapAcceptor 会将其进行处理,主要是调用了下面这个方法:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // msg为客户端,这里将它转成Channel
    final Channel child = (Channel) msg;

    // 将上面保存好的参数设置到childChannel中
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        // 将childChannel注册到selector 
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

至此,channel 的初始化工作完成。

config().group().register(channel)

接下来看这句代码

config().group().register(channel)

其中 config().group() 指的是 bossGroup,我们找到它的 register 方法

@Override
public ChannelFuture register(ChannelPromise promise) {
	//next()指的是选择一个eventLoop
    return next().register(promise);
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
	//这里设置 channel 的 eventLoop 为 bossEventLoop
    AbstractChannel.this.eventLoop = eventLoop;
	
	//判断当前线程与eventLoop绑定的线程是否为同一个线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            //...
        }
    }
}

跟进 register0 方法:

private void register0(ChannelPromise promise) {
    try {
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();
        
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        //...
    }
}

继续 doRegister 方法:

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
            	//注册事件0,表示不感兴趣
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                //...
            }
        }
    }

这里 javaChannel() 获取到的就是 java nio 原生的channel,将原生的 channel 注册到原生的 selector,由于此时还没有发生 bind,也就是说还有没有 Active,因此这里第一次注册的时候会注册0,表示不关注任何事件。

这个 this 指的是,当发生事件时,会对事件进行处理,此时会把这个 channel 拿出来去做处理

doRegister 完成后,会回到前面 doBind 方法里的 doBind0 方法,这里我们先不着急去看 doBind0 方法,后面再说。

eventLoop.excute()

前面我们通过 eventLoop.excute() 去执行 register0 方法,现在我们看看这个 excute 方法做了什么

@Override
public void execute(Runnable task) {
    boolean inEventLoop = inEventLoop();
    //添加到任务队列
    addTask(task);
    if (!inEventLoop) {
    	//启动线程
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

这里是将任务添加到任务队列,然后启动线程(任务队列是在 eventLoop 初始化时进行创建的)。

private void startThread() {
    if (state == ST_NOT_STARTED) {
    	//CAS
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            boolean success = false;
            try {
                doStartThread();
                success = true;
            } finally {
                if (!success) {
                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                }
            }
        }
    }
}

这里先通过 cas 的方式修改线程状态,修改成功后开始真正启动线程 doStartThread

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            if (interrupted) {
                thread.interrupt();
            }

            boolean success = false;
            updateLastExecutionTime();
            try {
            	//这里启动了一个不会停止的线程
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
            	//...
            }
        }
    });
}

这里的 execute 方法实际上是通过 ThreadExecutorMap 调用 eventLoop 里的 executorexecute 方法,并最终通过threadFactory.newThread(command).start();来进行启动。(不知道我这样理解对不对,要是不对可以评论指出来。详细可以看我上一篇文章 executor 初始化那部分)

public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
    return new Executor() {
        @Override
        public void execute(final Runnable command) {
            executor.execute(apply(command, eventExecutor));
        }
    };
}

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
    return new Runnable() {
        @Override
        public void run() {
            setCurrentEventExecutor(eventExecutor);
            try {
                command.run();
            } finally {
                setCurrentEventExecutor(null);
            }
        }
    };
}
@Override
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

这里eventLoop.excute()线程的启动说完,然后我们看看上文提到一个无限循环的线程SingleThreadEventExecutor.this.run();

protected void run() {
    for (;;) {
        try {
            try {
            	//选择就绪的channel
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;

                case SelectStrategy.BUSY_WAIT:
                    // fall-through to SELECT since the busy-wait is not supported with NIO
                //-1,表示当前任务队列中没有任务
                case SelectStrategy.SELECT:
                	//进行阻塞选择
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
                }
            } catch (IOException e) {
                //...
            }

            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                	//处理就绪channel的io
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    //执行任务队列中的任务
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } 
    }
}

这段代码的逻辑主要是选择就绪的 channel,如果没有,则进行阻塞选择,阻塞选择后进入下面的逻辑对 channel 进行处理(就算进行阻塞选择也不一定有 channel 需要处理)
这段逻辑先到此结束先,不然要讲的实在是有点多,而且也不是重点,难度也不大,就不在这里写了。

绑定

doBind0

好了我们现在回到主线 doBind0

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

继续跟进channel.bind方法:

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

这里由于 pipeline 有很多 Context,我们直接跳到具体实现 HeadContext

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();
    
    //判断有没有被激活
    boolean wasActive = isActive();
    try {
    	//绑定
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

最终看到了 doBind 方法,再跟进去:

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

最终我们发现,bind 调用的是 nio 原生的 channel 进行绑定

由于前面判断是否激活时时false,因此会走到下面的代码:

pipeline.fireChannelActive();

这里会走到 HeadContext 中的 channelActive 方法

 @Override
 public void channelActive(ChannelHandlerContext ctx) {
     ctx.fireChannelActive();
	 //注册读事件,包括:创建连接、读数据
     readIfIsAutoRead();
 }

跟进:

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

直接找到这个read方法

@Override
public void read(ChannelHandlerContext ctx) {
    unsafe.beginRead();
}
@Override
public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
        return;
    }

    try {
    	//这里开始注册读事件
        doBeginRead();
    } catch (final Exception e) {
        //...
    }
}

进入doBeginRead();,从这里开始注册读事件

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;
	
	//获得0
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
    	//注册读事件
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

由于前面注册的时候时先注册0,对任何事件不感兴趣,因此这里 interestOps 的值为0, 然后下面 selectionKey.interestOps(interestOps | readInterestOp); 则是真正的注册读事件。

至此 serverBootstrap 的启动分析完毕

总结

1.Channel 的创建是以反射的形式调用无参构造进行创建
2.初始化时,体现了 nettyreactor 模式
3.第一次注册时,是先注册0
4.eventLoop 的启动是在注册时进行启动的
5.最终监听是通过 bind 方法成功后的 fireChannelActive 来触发的

  JavaScript知识库 最新文章
ES6的相关知识点
react 函数式组件 & react其他一些总结
Vue基础超详细
前端JS也可以连点成线(Vue中运用 AntVG6)
Vue事件处理的基本使用
Vue后台项目的记录 (一)
前后端分离vue跨域,devServer配置proxy代理
TypeScript
初识vuex
vue项目安装包指令收集
上一篇文章      下一篇文章      查看所有文章
加:2021-11-15 15:45:41  更:2021-11-15 15:48:19 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 10:48:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码