什么是Netty,Netty各个组件介绍
本部分转载自 Java技术债务【什么是Netty?为什么使用Netty?Netty有哪些组件?】 原文链接:https://blog.csdn.net/qq_40124555/article/details/122993394 1、Netty 是一个 基于 NIO 的 client-server(客户端服务器)框架,使用它可以快速简单地开发网络应用程序。 2、它极大地简化并优化了 TCP 和 UDP 套接字服务器等网络编程,并且性能以及安全性等很多方面甚至都要更好。 3、支持多种协议 如 FTP,SMTP,HTTP 以及各种二进制和基于文本的传统协议。 用官方的总结就是:Netty 成功地找到了一种在不妥协可维护性和性能的情况下实现易于开发,性能,稳定性和灵活性的方法。
除了上面之外,很多开源项目比如我们常用的 Dubbo、RocketMQ、Elasticsearch、gRPC 等等都用到了 Netty。
为什么使用Netty
相比于直接使用 JDK 自带的 NIO 相关的 API 来说更加易用。 统一的 API,支持多种传输类型,阻塞和非阻塞的。 简单而强大的线程模型。 自带编解码器解决 TCP 粘包/拆包问题。 自带各种协议栈。 真正的无连接数据包套接字支持。 比直接使用 Java 核心 API 有更高的吞吐量、更低的延迟、更低的资源消耗和更少的内存复制。 安全性不错,有完整的 SSL/TLS 以及 StartTLS 支持。 社区活跃、成熟稳定,经历了大型项目的使用和考验,而且很多开源项目都使用到了 Netty, 比如我们经常接触的 Dubbo、RocketMQ 等等。
应用场景
NIO 可以做的事情 ,使用 Netty 都可以做并且更好。Netty 主要用来做网络通信 : 作为 RPC 框架的网络通信工具 :我们在分布式系统中,不同服务节点之间经常需要相互调用,这个时候就需要 RPC 框架了。不同服务节点之间的通信是如何做的呢?可以使用 Netty 来做。比如我调用另外一个节点的方法的话,至少是要让对方知道我调用的是哪个类中的哪个方法以及相关参数吧! 实现一个自己的 HTTP 服务器 :通过 Netty 我们可以自己实现一个简单的 HTTP 服务器,这个大家应该不陌生。说到 HTTP 服务器的话,作为 Java 后端开发,我们一般使用 Tomcat 比较多。一个最基本的 HTTP 服务器可要以处理常见的 HTTP Method 的请求,比如 POST 请求、GET 请求等等。 实现一个即时通讯系统 :使用 Netty 我们可以实现一个可以聊天类似微信的即时通讯系统, 实现消息推送系统 :市面上有很多消息推送系统都是基于 Netty 来做的。
Netty 的高性能表现
心跳,对服务端:会定时清除闲置会话 inactive(netty5),**对客户端:**用来检测会话是否断开,是否重来,检测网络延迟,其中 idleStateHandler 类 用来检测会话状态 **串行无锁化设计,**即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。表面上看,串行化设计似乎 CPU 利用率不高,并发程度不够。但是,通过调整 NIO 线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。 可靠性,链路有效性检测:链路空闲检测机制,读/写空闲超时机制;内存保护机制:通过内存池重用 ByteBuf;ByteBuf 的解码保护;优雅停机:不再接收新消息、退出前的预处理操作、资源的释放操作。 Netty 安全性:支持的安全协议:SSL V2 和 V3,TLS,SSL 单向认证、双向认证和第三方 CA认证。 高效并发编程的体现:volatile 的大量、正确使用;CAS 和原子类的广泛使用;线程安全容器的使用;通过读写锁提升并发性能。IO 通信性能三原则:传输(AIO)、协议(Http)、线程(主从多线程) 流量整型的作用(变压器):防止由于上下游网元性能不均衡导致下游网元被压垮,业务流中断;防止由于通信模块接受消息过快,后端业务线程处理不及时导致撑死问题
Netty核心组件
ByteBuf
本部分转载自thinking_fioa【Netty专栏 ( 三)——— Netty的ByteBuf】 原文链接:https://blog.csdn.net/thinking_fioa/article/details/80795673 Netty ByteBuf 优势 Netty 提供了ByteBuf,来替代Java NIO的 ByteBuffer 缓,来操纵内存缓冲区。 与Java NIO的 ByteBuffer 相比,ByteBuf的优势如下: Pooling (池化,这点减少了内存复制和GC,提升效率) 可以自定义缓冲类型 通过一个内置的复合缓冲类型实现零拷贝 扩展性好,比如 StringBuffer 不需要调用 flip()来切换读/写模式 读取和写入索引分开 方法链 引用计数
ByteBuf类 ----- Netty的数据容器
ByteBuf如何工作的
ByteBuf维护两个不同的索引: 读索引(readerIndex)和写索引(writerIndex)。如下图: ByteBuf维护了readerIndex和writerIndex索引(比nio的ByteBuff更加简单) 当readerIndex > writerIndex时,则抛出IndexOutOfBoundsException ByteBuf容量 = writerIndex。 ByteBuf可读容量 = writerIndex - readerIndex readXXX()和writeXXX()方法将会推进其对应的索引。自动推进 getXXX()和setXXX()方法将对writerIndex和readerIndex无影响 5.2.2 ByteBuf的使用模式 ByteBuf本质是: 一个由不同的索引分别控制读访问和写访问的字节数组。请记住这句话。ByteBuf共有三种模式: 堆缓冲区模式(Heap Buffer)、直接缓冲区模式(Direct Buffer)和复合缓冲区模式(Composite Buffer)
- 堆缓冲区模式(Heap Buffer)
堆缓冲区模式又称为:支撑数组(backing array)。将数据存放在JVM的堆空间,通过将数据存储在数组中实现 堆缓冲的优点: 由于数据存储在Jvm堆中可以快速创建和快速释放,并且提供了数组直接快速访问的方法 堆缓冲的缺点: 每次数据与I/O进行传输时,都需要将数据拷贝到直接缓冲区
public static void heapBuffer() {
ByteBuf heapBuf = Unpooled.buffer();
if (heapBuf.hasArray()) {
byte[] array = heapBuf.array();
int offset = heapBuf.arrayOffset() + heapBuf.readerIndex();
int length = heapBuf.readableBytes();
handleArray(array, offset, length);
}
}
- 直接缓冲区模式(Direct Buffer)
Direct Buffer属于堆外分配的直接内存,不会占用堆的容量。适用于套接字传输过程,避免了数据从内部缓冲区拷贝到直接缓冲区的过程,性能较好
Direct Buffer的优点: 使用Socket传递数据时性能很好,避免了数据从Jvm堆内存拷贝到直接缓冲区的过程。提高了性能 Direct Buffer的缺点: 相对于堆缓冲区而言,Direct Buffer分配内存空间和释放更为昂贵 对于涉及大量I/O的数据读写,建议使用Direct Buffer。而对于用于后端的业务消息编解码模块建议使用Heap Buffer
public static void directBuffer() {
ByteBuf directBuf = Unpooled.directBuffer();
if (!directBuf.hasArray()) {
int length = directBuf.readableBytes();
byte[] array = new byte[length];
directBuf.getBytes(directBuf.readerIndex(), array);
handleArray(array, 0, length);
}
}
- 复合缓冲区模式(Composite Buffer)
Composite Buffer是Netty特有的缓冲区。本质上类似于提供一个或多个ByteBuf的组合视图,可以根据需要添加和删除不同类型的ByteBuf。
想要理解Composite Buffer,请记住:它是一个组合视图。它提供一种访问方式让使用者自由的组合多个ByteBuf,避免了拷贝和分配新的缓冲区。 Composite Buffer不支持访问其支撑数组。因此如果要访问,需要先将内容拷贝到堆内存中,再进行访问 下图是将两个ByteBuf:头部+Body组合在一起,没有进行任何复制过程。仅仅创建了一个视图
public static void byteBufComposite() {
CompositeByteBuf messageBuf = Unpooled.compositeBuffer();
ByteBuf headerBuf = Unpooled.buffer();
ByteBuf bodyBuf = Unpooled.directBuffer();
messageBuf.addComponents(headerBuf, bodyBuf);
messageBuf.removeComponent(0);
for (ByteBuf buf : messageBuf) {
System.out.println(buf.toString());
}
}
字节级操作
随机访问索引 ByteBuf的索引与普通的Java字节数组一样。第一个字节的索引是0,最后一个字节索引总是capacity()-1。请记住下列两条,非常有用:
readXXX()和writeXXX()方法将会推进其对应的索引readerIndex和writerIndex。自动推进 getXXX()和setXXX()方法用于访问数据,对writerIndex和readerIndex无影响
public static void byteBufRelativeAccess() {
ByteBuf buffer = Unpooled.buffer();
for (int i = 0; i < buffer.capacity(); i++) {
byte b = buffer.getByte(i);
System.out.println((char) b);
}
}
顺序访问索引
Netty的ByteBuf同时具有读索引和写索引,但JDK的ByteBuffer只有一个索引,所以JDK需要调用flip()方法在读模式和写模式之间切换。
- ByteBuf被读索引和写索引划分成3个区域:可丢弃字节区域,可读字节区域和可写字节区域
5.3.3 可丢弃字节区域 可丢弃字节区域是指:[0,readerIndex)之间的区域。可调用discardReadBytes()方法丢弃已经读过的字节。
-
discardReadBytes()效果 ----- 将可读字节区域(CONTENT)[readerIndex, writerIndex)往前移动(减)readerIndex位,同时修改读索引和写索引。 -
discardReadBytes()方法会移动可读字节区域内容(CONTENT)。如果频繁调用,会有多次数据复制开销,对性能有一定的影响
可写字节区域
可写字节区域是指:[writerIndex, capacity)之间的区域。任何名称以write开头的操作方法都将改变writerIndex的值。
可读字节区域
可读字节区域是指:[readerIndex, writerIndex)之间的区域。任何名称以read和skip开头的操作方法,都会改变readerIndex索引。
索引管理
-
markReaderIndex()+resetReaderIndex() ----- markReaderIndex()是先备份当前的readerIndex,resetReaderIndex()则是将刚刚备份的readerIndex恢复回来。常用于dump ByteBuf的内容,又不想影响原来ByteBuf的readerIndex的值 -
readerIndex(int) ----- 设置readerIndex为固定的值 -
writerIndex(int) ----- 设置writerIndex为固定的值 -
clear() ----- 效果是: readerIndex=0, writerIndex(0)。不会清除内存 -
调用clear()比调用discardReadBytes()轻量的多。仅仅重置readerIndex和writerIndex的值,不会拷贝任何内
查找操作(indexOf)
查找ByteBuf指定的值。类似于,String.indexOf(“str”)操作
- 最简单的方法 ----- indexOf()
- 利用ByteProcessor作为参数来查找某个指定的值。
代码:
public static void byteProcessor() {
ByteBuf buffer = Unpooled.buffer();
buffer.indexOf(buffer.readerIndex(), buffer.writerIndex(), (byte)8);
int index = buffer.forEachByte(ByteProcessor.FIND_CR);
}
派生缓冲区 ----- 视图
派生缓冲区为ByteBuf提供了一个访问的视图。视图仅仅提供一种访问操作,不做任何拷贝操作。下列方法,都会呈现给使用者一个视图,以供访问:
1. duplicate()
2. slice()
3. slice(int, int)
4. Unpooled.unmodifiableBuffer(...)
5. Unpooled.wrappedBuffer(...)
6. order(ByteOrder)
7. readSlice(int)
理解
-
上面的6中方法,都会返回一个新的ByteBuf实例,具有自己的读索引和写索引。但是,其内部存储是与原对象是共享的。这就是视图的概念 -
请注意:如果你修改了这个新的ByteBuf实例的具体内容,那么对应的源实例也会被修改,因为其内部存储是共享的 -
如果需要拷贝现有缓冲区的真实副本,请使用copy()或copy(int, int)方法。 -
使用派生缓冲区,避免了复制内存的开销,有效提高程序的性能
public static void byteBufSlice() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf sliced = buf.slice(0, 15);
System.out.println(sliced.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) == sliced.getByte(0);
}
public static void byteBufCopy() {
Charset utf8 = Charset.forName("UTF-8");
ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8);
ByteBuf copy = buf.copy(0, 15);
System.out.println(copy.toString(utf8));
buf.setByte(0, (byte)'J');
assert buf.getByte(0) == copy.getByte(0);
}
读/写操作
如上文所提到的,有两种类别的读/写操作:
-
get()和set()操作 ----- 从给定的索引开始,并且保持索引不变 -
read()和write()操作 ----- 从给定的索引开始,并且根据已经访问过的字节数对索引进行访问 -
下图给出get()操作API,对于set()操作、read()操作和write操作可参考书籍或API
更多的操作
ByteBuf分配
创建和管理ByteBuf实例的多种方式:按需分配(ByteBufAllocator)、Unpooled缓冲区和ByteBufUtil类
5.5.1 按序分配: ByteBufAllocator接口 Netty通过接口ByteBufAllocator实现了(ByteBuf的)池化。Netty提供池化和非池化的ButeBufAllocator:
-
ctx.channel().alloc().buffer() ----- 本质就是: ByteBufAllocator.DEFAULT -
ByteBufAllocator.DEFAULT.buffer() ----- 返回一个基于堆或者直接内存存储的Bytebuf。默认是堆内存 -
ByteBufAllocator.DEFAULT ----- 有两种类型: UnpooledByteBufAllocator.DEFAULT(非池化)和PooledByteBufAllocator.DEFAULT(池化)。对于Java程序,默认使用PooledByteBufAllocator(池化)。对于安卓,默认使用UnpooledByteBufAllocator(非池化) -
可以通过BootStrap中的Config为每个Channel提供独立的ByteBufAllocator实例
解释:
-
上图中的buffer()方法,返回一个基于堆或者直接内存存储的Bytebuf ----- 缺省是堆内存。源码: AbstractByteBufAllocator() { this(false); } -
ByteBufAllocator.DEFAULT ----- 可能是池化,也可能是非池化。默认是池化(PooledByteBufAllocator.DEFAULT)
5.5.2 Unpooled缓冲区 ----- 非池化 Unpooled提供静态的辅助方法来创建未池化的ByteBuf。
注意:
- 上图的buffer()方法,返回一个未池化的基于堆内存存储的ByteBuf
- wrappedBuffer() ----- 创建一个视图,返回一个包装了给定数据的ByteBuf。非常实用
5.5.3 ByteBufUtil类 ByteBufUtil类提供了用于操作ByteBuf的静态的辅助方法: hexdump()和equals
-
hexdump() ----- 以十六进制的表示形式打印ByteBuf的内容。非常有价值 -
equals() ----- 判断两个ByteBuf实例的相等性
netty零拷贝,对象池,内存池
参考文章文件io和网络io中有对这三点的介绍
当需要连接客户端或者服务器绑定指定端口是需要使用Bootstrap,ServerBootstrap有两种类型,一种是用于客户端的Bootstrap,一种是用于服务端 的ServerBootstrap。不管程序使用哪种协议,无论是创建一个客户端还是服务器都需要使 用“引导”。
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap Bootstrap b = new Bootstrap();
b.group(group). ......
ChannelFuture f = b.connect(host, port).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
ServerBootstrap 客户端的启动引导类/辅助类
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup). ......
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
}
Bootstrap 通常使用 connet() 方法连接到远程的主机和端口,作为一个 Netty TCP 协议通信中的客户端。另外,Bootstrap 也可以通过 bind() 方法绑定本地的一个端口,作为 UDP 协议通信中的一端。 ServerBootstrap通常使用 bind() 方法绑定本地的端口上,然后等待客户端的连接。
Bootstrap 只需要配置一个线程组— EventLoopGroup,而 ServerBootstrap需要配置两个线程组— EventLoopGroup ,一个用于接收连接,一个用于具体的处理。
分类 | Bootstrap | ServerBootstrap |
---|
网络功能 | 连接到远程主机和端口 | 绑定本地端口 | EventLoopGroup 数量 | 1 | 2 |
一个 ServerBootstrap 可以认为有2个 Channel 集合, 第一个集合包含一个单例 ServerChannel,代表持有一个绑定了本地端口的 socket; 第二集合包含所有创建的 Channel,处理服务器所接收到的客户端进来的连接。
EventLoop和EventLoopGroup
EventLoop 定义了 Netty 的核心抽象,用于处理连接的生命周期中所发生的事件。
EventLoop 的主要作用实际就是负责监听网络事件并调用事件处理器进行相关 I/O 操作的处理。
Channel 和 EventLoop 直接有啥联系呢?
Channel 为 Netty 网络操作(读写等操作)抽象类,EventLoop 负责处理注册到其上的Channel 处理 I/O 操作,两者配合参与 I/O 操作。
EventLoopGroup包含多个EventLoop,每个EventLoop通常内部包含一个线程。EventLoop在处理IO事件时在自己的Thread线程上进行,从而保证线程安全
NioEventLoopGroup在未指定线程数时,默认时当前cpu线程数*2 EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般会有多个 EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。 EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop 线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的Selector 实例BossEventLoop 不断轮询Selector 将连接事件分离出来 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给WorkerEventLoopGroup WorkerEventLoopGroup 会由 next 选择其中一个 EventLoop来将这个SocketChannel 注册到其维护的Selector 并对其后续的 IO 事件进行处理
Channel通道
Channel 接口是 Netty 对网络操作抽象类,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。
比较常用的Channel接口实现类是NioServerSocketChannel(服务端)和NioSocketChannel(客户端),这两个 Channel 可以和 BIO 编程模型中的ServerSocket以及Socket两个概念对应上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 类的复杂性。
Channel channel = ...;
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
} });
- 创建 ByteBuf 保存写的数据
- 写数据,并刷新
- 添加 ChannelFutureListener 即可写操作完成后收到通知
- 写操作没有错误完成
- 写操作完成时出现错误
channel生命周期 状态 描述 ChannelUnregistered Channel 已经被创建,但还未注册到EventLoop ChannelRegistered Channel 已经被注册到了EventLoop ChannelActive Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了 ChannelInactive Channel 没有连接到远程节点
作用:
SelectonKey 状态
- OP_ACCEPT 操作集位用于插座接受操作。
- OP_CONNECT 用于套接字连接操作的操作集位。
- OP_READ 读操作的操作位。
- OP_WRITE 写操作的操作位。
ChannelHandler
ChannelHandler是消息的处理器,负责读写操作和客户端连接等。
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。 ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方 便使用期间,可以继承它的子类 子类>>Netty自带的ChannelHandler ChannelPipeline 为 ChannelHandler 的链,提供了一个容器并定义了用于沿着链传播入站和出站事件流的 API 。当 Channel 被创建时,它会被自动地分配到它专属的 ChannelPipeline。
可以在 ChannelPipeline 上通过 addLast() 方法添加一个或者多个ChannelHandler ,因为一个数据或者事件可能会被多个 Handler 处理。当一个 ChannelHandler 处理完之后就将数据交给下一个 ChannelHandler 。
Netty 发送消息有两种方式。您可以直接写消息给 Channel 或写入 ChannelHandlerContext 对象。主要的区别是, 前一种方法会导致消息从 ChannelPipeline的 尾部开始,而 后者导致消息从 ChannelPipeline 下一个处理器 开始。 ChannelHandler的子接口:
ChannelInboundHandler——处理入站数据以及各种状态变化; ChannelOutboundHandler——处理出站数据并且允许拦截所有的操作; ChannelDuplexHandler——既可以处理入站数据,也可以处理出站数据。
- SslChannel:负责对请求进行加密和解密,是放在ChannelPipeline的第一 个ChannelHandler
HttpClientCodec和HttpServerCodec:HttpClientCodec负责将请求字节码解码为HttpRequest、HttpContent和LastHttpContent消息,以及对应的转为字节;HttpServerCodec负责服务端中将字节码解析成HttpResponse、HttpContent和LastHttpContent消息,以及对应的将它转为字节。
HttpServerCodec 里面组合了HttpResponseEncoder和HttpRequestDecoder HttpClientCodec 里面组合了HttpRequestEncoder和HttpResponseDecoder
HttpObjectAggregate:负责将http聚合成完整的消息,而不是原始的多个部分。
HttpContentCompressor和HttpContentDecompressor:HttpContentCompressor用于服务器压缩数据,HttpContentDecompressor用于客户端解压数据
IdleStateHandler:连接空闲时间过长,触发IdleStateEvent事件
ReadTimeoutHandler:指定时间内没有收到任何入站数据,抛出ReadTimeoutException异常,关闭Channel。
WriteTimeoutHandler:指定时间内没有收到任何出站数据写入,抛出WriteTimeoutException异常,关闭Channel。
DelimiterBasedFrameDecoder:使用任何用户提供的分隔符来提取帧的通用解码器。
FixedLengthFrameDecoder:提取在调用构造函数时的定长帧。
ChuckedWriterHandler:将大型文件从文件系统复制到内存【DefaultFileRegion进行大型文件传输】 注意:
ChannelHandler实例如果带有 @Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler。如果添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。ChannelHandler实例如果带有@Sharable注解则可以被添加到多个ChannelPipeline。也就是说单个ChannelHandler实例可以有多个ChannelHandlerContext,因此可以调用不同ChannelHandlerContext获取同一个ChannelHandler。如果添加不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。
出站ChannelOutboundHandler接口
出站操作和数据将由ChannelOutboundHandler处理。它的方法将被Channel、ChannelPipeline以及ChannelHandlerContext调用。ChannelOutboundHandler的一个强大的功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。
public interface ChannelOutboundHandler extends ChannelHandler {
入站ChannelInboundHandler接口
ChannelPipeline
ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解: ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站 事件和出站操作) ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事 件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互 在Netty中每个Channel都有且仅有一个ChannelPipeline与之对应
一个channel对应一个pipeline,一个pipeline对应n个ChannelHandler
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰 ChannelPipeline 调度 handler
Context包装handler,多个Context在pipeline中形成了双向链表,入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始。 而节点中间的传递通过AbstractChannelHandlerContext类内部的fire系列方法,找 到当前节点的下一个节点不断的循环传播。是一个过滤器形式完成对handler 的调度
ChannelPipeline 调度 handler
Context包装handler,多个Context在pipeline中形成了双向链表,入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始。 而节点中间的传递通过AbstractChannelHandlerContext类内部的fire系列方法,找 到当前节点的下一个节点不断的循环传播。是一个过滤器形式完成对handler 的调度
ChannelHandlerContext
保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象 即ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler, 同 时ChannelHandlerContext 中也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler进行调用. ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的关联,每当有ChannelHandler添加到ChannelPipeline中时,都会创建ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。
ChannelHandlerContext有很多的方法,其中一些方法也存在于Channel和ChannelPipeline本身上,但是有一点重要的不同。如果调用Channel或者ChannelPipeline上的这些方法,它们将沿着整个ChannelPipeline进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler开始,并且只会传播给位于该ChannelPipeline中的下一个能够处理该事件的ChannelHandler。
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
Channel channel();
EventExecutor executor();
String name();
ChannelHandler handler();
boolean isRemoved();
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
@Override
ChannelHandlerContext read();
@Override
ChannelHandlerContext flush();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
}
Option参数
本部分转载自 我犟不过你【netty(十九)Netty优化 - option中的参数优化】 原文链接:https://www.jianshu.com/p/1ddedcf82e79
什么是option?
前面学习了Netty的服务端,和客户端,知道了创建服务要分别使用ServerBootStrap和BootStrap,不知道有没有关注到其中有一个方法叫做Option的? 我们分别看下ServerBootStrap和BootStrap的option: 1)ServerBootStrap
如上图所示,有两种option,一个是自己的option(ServerSocketChannel),一个childOption(ScoketChannel的option)。 2)BootStrap 只有一个option方法。 无论是上述哪两种,参数都是ChannelOption,而这个ChannelOption Netty已经帮我们准备好了,可以直接使用。 下面我们会针对几种重要的配置讲解一下。
CONNECT_TIMEOUT_MILLIS
ScoketChannel的参数。 用在客户端建立连接时,如果超过指定的时间仍未连接,则抛出timeout异常。
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,500);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
worker.shutdownGracefully();
}
}
只启动客户端,抛出如下异常:
Exception in thread "main" io.netty.channel.ConnectTimeoutException: connection timed out: /127.0.0.1:8080
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
若果该参数设置过长,且服务端确实没启动,则会抛出java层面的异常,拒绝连接:
Exception in thread "main" io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: /127.0.0.1:8080
Caused by: java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
SO_TIMEOUT
这个参数适用于阻塞IO,比如阻塞IO当中的read,accept等方法,修饰阻塞的,如果不想一直阻塞,可以通过改参数设置超时时间。 不要与CONNECT_TIMEOUT_MILLIS弄混了。
SO_BACKLOG
ServerSocketChannal 参数。 在了解这个参数之前,要先了解下TCP的三次握手,sync_queue(半连接队列)和accept_queue(全连接队列)。 其中半连接队列是在首次握手时,将请求放入半连接队列,当三次握手全部成功后,将请求从半连接队列放入全连接队列。 下图展示netty和三次握手的关系:
- 第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
- 第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
- 第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue。
在上面的过程中,提到的sync_queue和accept_queue是我们本篇文章需要关注的重点。 在linux2.2之前,backlog包括了两个队列的大小。在之后的版本当中,由如下两个参数来控制:
- sync queue - 半连接队列
- 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
- accept queue - 全连接队列
- 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数比较,取二者的较小值。
- 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client。
下面回归正题,在netty当中,通过ChannelOption.SO_BACKLOG设置大小,如下所示:
public class Server {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.option(ChannelOption.SO_BACKLOG,2);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
System.out.println("server error:" + e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
如上代码所示,设置了一个backlog为2的值。然后我们需要启动至少三个客户端看结果。 通过前面的三次握手的图,可以知道,只有当服务端处理不过来时,才会使用全连接队列(完成三次握手,建立tcp连接,但是没有被accept,对于客户端来说,连接建立成功),并将其占满,否则会直接走accept()方法,导致我们看不到测试结果。 所以我们这里不做测试了。 我们看下这个backlog的默认值在nio当中是多少: 在NIO当中backlog在ServerSocketChannel当中的bind方法被调用,所以我们从这里跟踪进去找到bind方法:
public final ServerSocketChannel bind(SocketAddress local)
throws IOException
{
return bind(local, 0);
}
查看bind被哪些地方调用,NioServerSocketChannel:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
跟踪config.getBacklog():
private final ServerSocketChannelConfig config;
这个config是接口,直接看它的实现DefaultServerSocketChannelConfig:
private volatile int backlog = NetUtil.SOMAXCONN;
找SOMAXCONN:
public static final int SOMAXCONN;
找到SOMAXCONN赋值的位置,默认是windows200,Linux或mac默认128
ulimit
属于操作系统参数。 使用ulimit -n 可以查看当前的最大打开文件数。使用ulimit -a 可以查看当前系统的所有限制值。 linux默认1024,当服务器负载较大时,会发生too many open files的错误,所以我们为了提供并发量,需要手动将其调整。 使用如下命令可以将其调整,但是是临时性的,可以考虑将其放在启动脚本当中:
ulimit -n 4096
TCP_NODELAY
部分参数的含义在IO基础篇中已经讲过了,看下这个TCP_NODELAY 在写游戏服务器的时候,连接建立后通常会设置一个选项:TCP_NODELAY,该选项会禁用Nagle算法。 Nagle算法的作用是减少小包的数量,它是如何做到的呢(注意我是抄书的,如果不对欢迎指正)?
- 什么是小包:小于 MSS(一个TCP段在网络上传输的最大尺寸) 的都可以定义为小包。
- 如果前一个TCP段发送出去后,还没有收到对端的ACK包,那么接下来的发送数据会先累积起来不发。
- 等到对端返回ACK,或者数据累积已快达到MSS,才会发送出去。
拿telnet的字符回响来示例,在默认情况下(Nagle算法开启),假如敲键盘的速度是250ms一下,网络RTT(一个包到达目标,然后目标回应的总时间)是600ms: Nagle算法经常还和对端的延迟ACK算法碰到一起,TCP端收到包后,不马上ACK回去,而是延迟一点点时间,如果这段时间有包要发送回去,则上一个ACK刚好合并在一起发。从这可看出Nagle算法和延迟ACK算法的目的都是一样的:减少TCP段的传输数量。 但在网络游戏这种实时通信中,这种减少包的做法,如果网络较差的时候,可能会引起比较大的波动,比如玩家正在PK,发了技能没有很快的反馈,过一会儿很多技能效果一起回来,这个体验是比较差的。 使用TCP_NODELAY可以禁用Nagle算法,坏处就是小包比较多,对网络交通会有负担,拿个不一定恰当的比喻: 开了Nagle算法相当于大家尽量做公交车,这样路上的小车少了,交通会通顺一些,但是要等公交车;如果大家都开车,对交通的负载会加大。
SO_SNDBUF & SO_RCVBUF
tcp缓冲区,用于缓冲,计算机发送速度大于网络io速度,接收速度大于计算机处理速度的。 SO_SNDBUF 属于 SocketChannal 参数 SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数 这两个参数不建议我们手动进行设置,因为操作系统会根据当前占用,进行自动的调整。
ALLOCATOR
属于 SocketChannal 参数。 ByteBuf的分配器。
serverBootstrap.childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT);
这个参数只有一个DEFAULT可以使用。 这个参数与ch.alloc().buffer()命令有关,关系着我们分配的buf是池化还是非池化,是直接内存还是堆内存。 我们从上面的Default跟踪进去:
serverBootstrap.group(bossGroup, workerGroup).
group方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
分别设置childGroup和parentGroup 入参为EventLoopGroup
找到对其赋值的位置,发现了如下的静态代码块,此处就是设置buf是pooled还是unpooled,通过环境变量:“io.netty.allocator.type” 指定,我们可以在启动项目时指定-Dio.netty.allocator.type=unpooled设置成非池化。从源码可以看到,安卓是unpooled,其他是pooled。
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
重点关注上述代码后半段!SystemPropertyUtil.getBoolean(“io.netty.noPreferDirect”, false);,我们可用通过-Dio.netty.noPreferDirect=true环境变量指定我们使用堆内存。
RCVBUF_ALLOCATOR
属于 SocketChannal 参数。 控制 netty 接收缓冲区大小。 这个RCVBUF_ALLOCATOR不要与前面的ALLOCATOR弄混。 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定。 通俗的讲在handler内部分配的byteBuf可以是直接内存,也可以是堆内存,但是经过网络io的内存,netty会强制为直接内存。
|