IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Day477.Netty核心源码 -netty -> 正文阅读

[Java知识库]Day477.Netty核心源码 -netty

Netty核心源码

一、Netty 启动过程源码剖析

NioEventLoop 类 的run() 代码 ,无限循环,在服务器端运行

在这里插入图片描述

1、源码的基本理解

//服务器启动类源码
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

        } else {
            sslCtx = null;
        }
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        if (sslCtx != null) {
                            p.addLast(sslCtx.newHandler(ch.alloc()));
                        }
                        //p.addLast(new LoggingHandler(LogLevel.INFO));
                        p.addLast(new EchoServerHandler());
                    }
                });
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

说明

  1. 先看启动类:main 方法中,首先创建了关于 SSL 的配置类。

  2. 重点分析下 创建了两个 EventLoopGroup 对象:

  1. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  2. EventLoopGroup workerGroup = new NioEventLoopGroup();

.
(1) 这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。

(2) EventLoopGroup 是 事件循环组(线程组) 含有多个 EventLoop,可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)

(3) new NioEventLoopGroup(1); 这个 1 表示 bossGroup 事件组有 1 个线程你可以指定,如果 new NioEventLoopGroup() 会含有默认个线程 cpu 核数*2, 即可以充分的利用多核的优势

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));

会创建 EventExecutor 数组 children = new EventExecutor[nThreads]; //debug 一下
每个元素的类型就是 NIOEventLoop, NIOEventLoop 实现了 EventLoop 接口 和 Executor 接口
try 块中创建了一个 ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化( 看下源码 allows easy bootstrap of {@link ServerChannel} )。和 它和 ServerChannel , 关联, 而 而 ServerChannel 继承了Channel法 ,有一些方法 remoteAddress 等

随后,变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用【debug 下 group 方法

/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/

(4) 然后添加了一个 channel,其中参数一个 Class 对象,引导类将通过这个 Class 对象反射创建
ChannelFactory。然后添加了一些 TCP 的参数。[说明:Channel 的创建在 bind 方法,可以 Debug 下 bind ,会找到 channel = channelFactory.newChannel(); ]

(5) 再添加了一个服务器专属的日志处理器 handler。

(6) 再添加一个 SocketChannel(不是 ServerSocketChannel)的 handler。

(7) 然后绑定端口并阻塞至连接成功。

(8) 最后 main 线程阻塞等待关闭。

(9) finally 块中的代码将在服务器关闭时优雅关闭所有资源

//服务器端处理器源码
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

说明

这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过来的内容,然后打印,最后发送字符串给客户端。


2、EventLoopGroup 的过程

  • 构造器方法
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}
  • 上面的 this(nThreads, (Executor) null); 调用构造器 ( 通过 alt+d 看即可)
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}
  • 上面的 this(nThreads, executor, SelectorProvider.provider()); 调用下面构造器
public NioEventLoopGroup(
    int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
  • 上面的 this ()... 调用构造器(alt+d)
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
  • 上面的 super() .. 的方法是父类: MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
  • 追踪到源码抽象类 MultithreadEventExecutorGroup 的构造器方法 MultithreadEventExecutorGroup 才是
    NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了设计模式的模板模式,

3、MultithreadEventExecutorGroup

  • 参数说明

@param nThreads 使用的线程数,默认为 core *2 [可以追踪源码]

@param executor执行器 : 如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器ThreadPerTaskExecutor

@param chooserFactory 单例 new DefaultEventExecutorChooserFactory()
@param args args 在创建执行器的时候传入固定参数

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    if (executor == null) { //如果传入的执行器是空的则采用默认的线程工厂和默认的执行器
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    //创建指定线程数的执行器数组
    children = new EventExecutor[nThreads];
    //初始化线程数组
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 创建 new NioEventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            // 如果创建失败,优雅关闭
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }
    chooser = chooserFactory.newChooser(children);
    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };
    //为每一个单例线程池添加一个关闭监听器
    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    //将所有的单例线程池添加到一个 HashSet 中。
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

说明:

  1. 如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
  2. 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
  3. 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
  4. 根据线程选择工厂创建一个 线程选择器。
  5. 为每一个单例线程池添加一个关闭监听器。
  6. 将所有的单例线程池添加到一个 HashSet 中。

4、ServerBootstrap 创建和构造过程

  • ServerBootstrap 是个空构造,但是有默认的成员变量
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
//config 对象,会在后面起很大作用
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
  • ServerBootstrap 基本使用情况
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 100)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline p = ch.pipeline();
            if (sslCtx != null) {
                p.addLast(sslCtx.newHandler(ch.alloc()));
            }
            //p.addLast(new LoggingHandler(LogLevel.INFO));
            p.addLast(new EchoServerHandler());
        }
    });

说明:

  1. 链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup属性
  2. channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
  3. option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。
  4. handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是SocketChannel
  5. childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用

5、绑定端口

  • 服务器就是在这个 bind 方法里启动完成的

  • bind 方法代码, 追踪到创建了一个端口对象,并做了一些空判断, 核心代码 doBind

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}
  • doBind 源码剖析, 核心是两个方法 initAndRegister 和 doBind0

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        if (regFuture.isDone()) {
            //At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            //============================================
            //说明:执行 doBind0 方法,完成对端口的绑定
            //============================================
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not
                        cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

6、initAndRegister

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {// 说明: channelFactory.newChannel() 方法 的作用 通过 ServerBootstrap 的通道工厂反射创建一个NioServerSocketChannel, 具体追踪源码可以得到下面结论
        /*
    (1) 通过 NIO 的 SelectorProvider 的 openServerSocketChannel 方法得到 JDK 的 channel。目的是让 Netty 包装 JDK 的 channel。
    (2) 创建了一个唯一的 ChannelId,创建了一个 NioMessageUnsafe,用于操作消息,创建了一个 DefaultChannelPipeline 管道,是个双向链表结构,用于过滤所有的进出的消息。
    (3) 创建了一个 NioServerSocketChannelConfig 对象,用于对外展示一些配置。
        */
        channel = channelFactory.newChannel();//NioServerSocketChannel
        // 说明:init 个 初始化这个 NioServerSocketChannel, 具体追踪源码可以得到如下结论
        /*
      (1) init 方法,这是个抽象方法(AbstractBootstrap 类的)由 ,由 ServerBootstrap 实现(可以追一下源码 //setChannelOptions(channel, options, logger); )。
      (2) 设置 NioServerSocketChannel 的 的 TCP 属性。
      (3) 由于 LinkedHashMap 是非线程安全的,使用同步进行处理。
      (4) 对 NioServerSocketChannel 的 的 ChannelPipeline 加 添加 ChannelInitializer 处理器。
      (5) 可以看出, init 和 的方法的核心作用在和 ChannelPipeline 相关。
      (6) 从NioServerSocketChannel 的初始化过程中,我们知道,pipeline 是一个双向链表,并了且,他本身就初始化了 head 和tail的 ,这里调用了他的 addLast 个方法,也就是将整个handler到插入到tail的
为前面,因为 tail 永远会在后面,需要做一些系统的固定工作。
        */
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(),
                                         GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

说明:

  1. 基本说明: initAndRegister() 初始化 NioServerSocketChannel 通道并注册各个 handler,返回一个future
  2. 通过 ServerBootstrap 的通道工厂反射创建一个 NioServerSocketChannel。
  3. init 初始化这个 NioServerSocketChannel。
  4. config().group().register(channel) 通过 ServerBootstrap 的 bossGroup 注册NioServerSocketChannel。
  5. 最后,返回这个异步执行的占位符即 regFuture。
  • init方法 会调用 addLast, 现在进入到 addLast 方法内查看
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    finalAbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

说明:

  1. addLast 方法,在 DefaultChannelPipeline 类中

  2. addLast 方法这就是 pipeline 方法的核心

  3. 检查该 handler 是否符合标准。

  4. 创 建 一 个 AbstractChannelHandlerContext 对 象 ,这 里 说 一 下 ,ChannelHandlerContext 对 象 是ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 Pipeline 中时,都会创建Context。

Context 的主要功能是管理他所关联的 Handler 和同一个 Pipeline 中的其他 Handler 之间的交互。

  1. 将 Context 添加到链表中。也就是追加到 tail 节点的前面。
  1. 最后,同步或者异步或者晚点异步的调用 callHandlerAdded0 方法
  • 前面说了 dobind 方法有 2 个重要的步骤,initAndRegister 说完,接下来看 doBind0 方法, 代码如下
private static void doBind0(
    final ChannelFuture regFuture, final Channel channel,
    final SocketAddress localAddress, final ChannelPromise promise) {
    // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //bind 方法这里下断点,这里下断点,来玩!!
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

说明:

  1. 该方法的参数为 initAndRegister 的 future,NioServerSocketChannel,端口地址,NioServerSocketChannel 的promise
  2. 这里就可以根据前面下的断点,一直 debug:
//将调用 LoggingHandler 的 invokeBind 方法, 最后会追到
   //DefaultChannelPipeline 类的 bind
   //然后进入到 unsafe.bind 方法 debug , 注意要追踪到
   // unsafe.bind , 要 要 debug 第二圈的时候,才能看到.
   @Override
   public void bind(
   ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
   throws Exception {
   unsafe.bind(localAddress, promise);
}
继续追踪 AbstractChannelpublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
   //....
   try {
       //!!!!旗 小红旗 是 可以看到,这里最终的方法就是 doBind 方法,执行成功后,执行通道的
       fireChannelActive 的 方法,告诉所有的 handler ,已经成功绑定。
           doBind(localAddress);//
   } catch (Throwable t) {
       safeSetFailure(promise, t);
       closeIfClosed();
       return;
   }
}
  1. 最终 doBind 就会追踪到 NioServerSocketChannel 的 doBind, 说明 Netty 底层使用的是 Nio
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
  • 回到 bind 方法(alt+v)
  • 最后一步:safeSetSuccess(promise),告诉 promise 任务成功了。其可以执行监听器的方法了。
  • 继续 atl+V 服务器就回进入到(NioEventLoop 类)一个循环代码,进行监听
@Override
protected void run() {
    for (;;) {
        try {
        }

7、Netty 启动过程梳理

  1. 创建 2 个 EventLoopGroup 线程池数组。数组默认大小 CPU*2,方便 chooser 选择线程池时提高性能
  2. BootStrap 将 boss 设置为 group 属性,将 worker 设置为 childer 属性
  3. 通过 bind 方法启动,内部重要方法为 initAndRegister 和 dobind 方法
  4. initAndRegister 方法会反射创建 NioServerSocketChannel 及其相关的 NIO 的对象, pipeline , unsafe,同时也为 pipeline 初始了 head 节点和 tail 节点。
  5. 在 register0 方法成功以后调用在 dobind 方法中调用 doBind0 方法,该方法会 调用 NioServerSocketChannel的 doBind 方法对 JDK 的 channel 和端口进行绑定,完成 Netty 服务器的所有启动,并开始监听连接事件

二、 Netty 接受请求过程源码剖析

1、源码剖析

  • 说明

    1. 从之前服务器启动的源码中,我们得知,服务器最终注册了一个 Accept 事件等待客户端的连接。
    2. NioServerSocketChannel 将自己注册到了 boss 单例线程池(reactor 线程)上,也就是 EventLoop
  • EventLoop 的作用是一个死循环,而这个循环中做 3 件事情:

    1. 有条件的等待 Nio 事件。
    2. 处理 Nio 事件。
    3. 处理消息队列中的任务。
    4. 仍用前面的项目来分析:进入到 NioEventLoop 源码中后,在 private void processSelectedKey(SelectionKey k,
    5. AbstractNioChannel ch) 方法开始调试最终我们要分析到 AbstractNioChannel 的 doBeginRead 方法, 当到这个方法时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了

2、源码分析过程

  • 断点位置 NioEventLoop 的如下方法 processSelectedKey

    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read(); //断点位置
    }
    
  • 执行浏览器 http://localhost:8007/, 客户端发出请求

  • 从的断点我们可以看到, readyOps 是 16 ,也就是 Accept 事件。说明浏览器的请求已经进来了

  • 这个 unsafe 是 boss 线程中 NioServerSocketChannel 的 AbstractNioMessageChannel $ NioMessageUnsafe 对象。

  • 进入到 AbstractNioMessageChannel $ NioMessageUnsafe 的 read 方法中

  • read 方法代码并分析

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);
    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }
                allocHandle.incMessagesRead(localRead);
            } while (allocHandle.continueReading());
        } catch (Throwable t) {
            exception = t;
        }
        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();
        if (exception != null) {
            closed = closeOnReadError(exception);
            pipeline.fireExceptionCaught(exception);
        }
        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...)
        method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
    }
}

说明:

  1. 检查该 eventloop 线程是否是当前线程。assert eventLoop().inEventLoop()
  2. 执行 doReadMessages 方法,并传入一个 readBuf 变量,这个变量是一个 List,也就是容器。
  3. 循环容器,执行 pipeline.fireChannelRead(readBuf.get(i));
  4. doReadMessages 是读取 boss 线程中的 NioServerSocketChannel 接受到的请求。并把这些请求放进容器,一会我们 debug 下 doReadMessages 方法.
  5. 循环遍历 容器中的所有请求,调用 pipeline 的 fireChannelRead 方法,用于处理这些接受的请求或者其他事件,在 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法, 开始执行 管道中的handler 的 ChannelRead 方法(debug 进入)
  • doReadMessages 方法
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    buf.add(new NioSocketChannel(this, ch));
    return 1;
}

说明:

  1. 通过工具类,调用 NioServerSocketChannel 内部封装的 serverSocketChannel 的 accept 方法,这是 Nio 做法。
  2. 获取到一个 JDK 的 SocketChannel,然后,使用 NioSocketChannel 进行封装。最后添加到容器中
  3. 这样容器 buf 中就有了 NioSocketChannel
  1. 回到 read 方法,继续分析 循环执行 pipeline.fireChannelRead 方法

  2. 前面分析 doReadMessages 方法的作用是通过 ServerSocket 的 accept 方法获取到 Tcp 连接,然后封装成Netty 的 NioSocketChannel 对象。最后添加到 容器中

  3. 在 read 方法中,循环调用 ServerSocket 的 pipeline 的 fireChannelRead 方法, 开始执行 管道中的 handler的 ChannelRead 方法(debug 进入)

  4. 经过 dubug (多次),可以看到会反复执行多个 handler 的 ChannelRead ,我们知道,pipeline 里面又 4 个handler ,分别是 Head ,LoggingHandler ,ServerBootstrapAcceptor ,Tail。

  5. 重点看看 ServerBootstrapAcceptor。debug 之后,断点会进入到 ServerBootstrapAcceptor 中来。我们来看看 ServerBootstrapAcceptor 的 channelRead 方法(要多次 debug 才可以)

  • channelRead 方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    setChannelOptions(child, childOptions, logger);
    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }
    try {//将客户端连接注册到 worker 线程池
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

说明:

 1) msg 强转成 Channel ,实际上就是 NioSocketChannel 。
  1. 添加 NioSocketChannel 的 pipeline 的 handler ,就是我们 main 方法里面设置的 childHandler 方法里的。
    3) 设置 NioSocketChannel 的各种属性。
    4) 将该 NioSocketChannel 注册到 childGroup 中的一个 EventLoop 上,并添加一个监听器。
    5) 这个 childGroup 就是我们 main 方法创建的数组 workerGroup。
  • register 方法
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);//进入到这里
            }
        });
    }
}
  • 最终会调用 doBeginRead 方法,也就是 AbstractNioChannel 类的方法
@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey; //断点
    if (!selectionKey.isValid()) {
        return;
    }
    readPending = true;
    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}
  • 这个地方调试时,请把前面的断点都去掉,然后启动服务器就会停止在 doBeginRead(需要先放过该断点,然后浏览器请求,才能看到效果)
  • 执行到这里时,针对于这个客户端的连接就完成了,接下来就可以监听读事件了

3、 Netty 接收请求过程梳理

总体流程

接受连接----->创建一个新的 NioSocketChannel------>注册到一个 worker EventLoop 上---->注册 selecot Read 事件

  1. 服务器轮询 Accept 事件,获取事件后调用 unsafe 的 read 方法,这个 unsafe 是 ServerSocket 的内部类,该方法内部由 2 部分组成
  2. doReadMessages 用于创建 NioSocketChannel 对象,该对象包装 JDK 的 Nio Channel 客户端。该方法会像创建 ServerSocketChanel 类似创建相关的 pipeline , unsafe,config
  3. 随后执行 执行 pipeline.fireChannelRead 方法,并将自己绑定到一个 chooser 选择器选择的 workerGroup 中的一个 EventLoop。并且注册一个 0,表示注册成功,但并没有注册读(1)事件

三、Pipeline Handler HandlerContext 创建源码剖析

1、源码剖析

明天继续!!!

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-12-14 15:48:10  更:2021-12-14 15:49:05 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:55:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码