IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 浅谈Netty -> 正文阅读

[网络协议]浅谈Netty

在学习Netty之前,建议大家先学习一下NIO的相关知识哈😀
传送门🚪 NIO的理解和使用

一、介绍

Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端,简单来说,就是用于网络编程的底层框架,向TCP、UDP的通信,或者项目中使用到的RPC框架,如:dubbo等都是使用的Netty作为底层框架

在学习Netty之前,我们要想一下为什么要学习Netty呢?我的回答是

  • 可以更好的理解NIO
  • 了解到多路复用的思想
  • 学习到优秀的设计模式
  • 为网络编程打下良好的基础

二、服务端案例

我们来看一下Netty的服务端简单实现吧😇

public class TestServer {
    public static void main(String[] args) {
        // 1. 启动器,负责组装 netty 组件,启动服务器
        new ServerBootstrap()
                // 2. BossEventLoop, WorkerEventLoop(selector,thread), group 组
                .group(new NioEventLoopGroup())
                // 3. 选择 服务器的 ServerSocketChannel 实现
                .channel(NioServerSocketChannel.class) // OIO BIO
                // 4. boss 负责处理连接 worker(child) 负责处理读写,决定了 worker(child) 能执行哪些操作(handler)
                .childHandler(
                        // 5. channel 代表和客户端进行数据读写的通道 Initializer 初始化,负责添加别的 handler
                        new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception {
                                // 6. 添加具体 handler
                                ch.pipeline().addLast(new LoggingHandler());
                                ch.pipeline                   ().addLast(new StringDecoder()); // 将 ByteBuf 转换为字符串
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { // 自定义 handler
                                    @Override // 读事件
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println(msg); // 打印上一步转换好的字符串
                                    }
                                });
                            }
                        })
                // 7. 绑定监听端口
                .bind(8080);
    }
}

就这十几行代码,就能够实现服务端且异步的多路复用,简单高效。我们先来解读一下这些代码

  • 创建的入口是ServerBootstrap类,它是Netty的启动类其实就是包装了Netty的属性配置、子处理器、子组别等等。
  • gourp组别表示创建的线程池类型,Netty中的线程池顶层接口是EventExecutorGroup,它继承了ScheduledExecutorService。
  • channel表示创建的通道类型。
  • childHandler是Netty的子处理器,这里有子,必然有父,那父子都是干嘛的呢?父负责的是Accept接收任务,读写任务由子处理。
  • 在initChannel中有多个pipeline,这些pipeline就是具体的执行逻辑,之间的通信靠ChannelHandlerContext全局对象来传递。

三、常见类

在源码分析Netty之间,我们先来看看Netty中的比较重要的类

🌍Future

public interface Future<V> extends java.util.concurrent.Future<V> {
	...
	Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
	Future<V> sync() throws InterruptedException;
	...
}

可以看到,Netty中的Future继承了Java的Future,并多了一些同步和异步的方法,其实就是对java的Future的封装,以满足Netty的业务需求。

🌍Promise

public interface Promise<V> extends Future<V> 

Promise继承了Future ,也可以当成一个异步接收对象。

🌍ChannelFuture

public interface ChannelFuture extends Future<Void> {}

继承了Future重写了方法以满足Channel通道上的异步结果的接收。

🌍 EventLoopGroup

public interface EventLoopGroup extends EventExecutorGroup {
	...
	@Override
    EventLoop next();
    ChannelFuture register(Channel channel);
    ...
}

是Netty中线程池的抽象类

🌍EventLoop

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

EventLoop可以处理所有的I/O操作。一个EventLoop 实例将处理多个channel。其实就是Netty中多路复用器的接口,具体的多路复用器实现这个接口,比如NioEventLoop。

🌍ChannelPipeline

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>>

它是各种入栈出栈处理器的集合。
ChannelInboundInvoker就是对应的入栈,可以理解为是读操作的handle。
ChannelOutboundInvoker对应的出栈,理解为是写操作的handle。

四、源码分析

下面我们来跟着源码来分析Netty

🏝?服务端的初始化过程

在这里插入图片描述
??体现了Netty的异步策略的代码如下
在init方法中

ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });

init方法在主线程中执行,使用了线程池的executre方法,也就是使用NIO线程来执行这个任务,当然,在此时NIO线程还未开启,所以,这个任务被放入到了Queue任务队列中。

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                    ...
}

这里开启了NIO的线程池,开始去处理任务和监听io事件。

🏝?下面我们来看看Netty的NIO线程是怎么监听事件的

在这里插入图片描述
看看关键代码吧😊

protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.BUSY_WAIT:
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                    }
                } catch (IOException e) {
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
						....

??需要注意的点

  • 这里是由NIO的线程执行的且无限循环
  • ioRatio 是io的比例,也就是执行select监测方法和执行任务的比例。默认是1:1

下面来看看select方法

 private void select(boolean oldWakenUp) throws IOException {
 		.....
        Selector selector = this.selector;
        if (......) {
        selector.selectNow();
        }
        selectCnt ++;
        selector = selectRebuildSelector(selectCnt);  
        .....
}

??注意几点

  • 这个方法重构了selector,避免了在Linux环境下出现的轮询bug
  • 引入了计数器和阈值,避免一直循环
  • 当某些条件成立判断channel中有可读数据时,调用selectNow方法

下面来看看服务端监听读事件并处理的过程
在这里插入图片描述
??关键的代码有

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 
	unsafe.read(); 
}
public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
                ....

🚩需要知道的是

  • 读操作也是通过底层原子类unsafe来设置缓冲区buf的大小
  • 将读取的数据放入buf中,然后再遍历buf,pipeline中读处理器一个字节一个字节的读取处理,当buf全部读取完毕,则调用回调方法通知。

五、小总结

  1. Netty中使用了很多的异步处理方法,形如addListener等字样的代码都是异步,异步的意思是当前线程创建任务,由其他的线程去执行这个任务,当某个时间完成了这个任务,则回调方法返回异步结果对象。
  2. Netty中封装了线程池和Future等,在继承的基础上添加了很多异步和同步的处理方法。
  3. Netty使用了ioRatio属性很好的权衡了io处理和任务处理的执行比例。
  4. Netty使用重构的方式将NIO原生的selector重构,避免了在Linux中select轮询的bug。
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:40:35  更:2022-04-15 00:44:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 4:47:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码