Netty
一、Socket回顾与I/O模型
1、Socket网络编程回顾
1.1 Socket概述
- Socket(套接字)就是两台主机之间逻辑的端点。TCP/IP协议是传输层协议,主要解决数据如何在网络中传输,而HTTP是应用层协议,主要解决如何包装数据。Socket是通信的基石,是支持TCP/IP协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信息:连接使用的协议、本地主机的IP地址、本地进程的协议端口、、远程主机的IP地址、远程进程的协议端口
1.2 Socket整体流程
-
socket编程主要设计客户端和服务端两个方面,首先是在服务端创建一个服务器套接字对象(ServerSocket),并把它附加到一个端口上,服务器从这个端口监听连接。端口号的范围是0到65535,但是0到1024是为特权服务保留的端口号,可以选择任意一个当前没有被其它进程使用的端口 -
客户端请求与服务器进行连接的时候,根据服务器的域名或者IP地址,加上端口号,打开一个套接字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作 -
代码实现
package online.yuanle.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ServerDemo {
public static void main(String[] args) throws IOException {
ExecutorService executorService = Executors.newCachedThreadPool();
ServerSocket serverSocket = new ServerSocket(9999);
System.out.println("服务已启动");
while (true){
Socket socket = serverSocket.accept();
System.out.println("有客户端连接……");
executorService.execute(new Runnable() {
@Override
public void run() {
handle(socket);
}
});
}
}
private static void handle(Socket socket) {
try {
System.out.println("线程ID:" + Thread.currentThread().getId() + ",线程名称:" + Thread.currentThread().getName());
InputStream is = socket.getInputStream();
byte[] bytes = new byte[1024];
int read = is.read(bytes);
System.out.println("客户端:" + new String(bytes, 0, read));
OutputStream os = socket.getOutputStream();
os.write("没钱".getBytes());
}catch (IOException e){
e.printStackTrace();
}finally {
try {
socket.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
}
package online.yuanle.client;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ClientDemo {
public static void main(String[] args) throws IOException {
while (true) {
Socket socket = new Socket("127.0.0.1", 9999);
OutputStream os = socket.getOutputStream();
System.out.println("请输入:");
Scanner scanner = new Scanner(System.in);
String message = scanner.nextLine();
os.write(message.getBytes());
InputStream is = socket.getInputStream();
byte[] bytes = new byte[1024];
int read = is.read(bytes);
System.out.println("老板说:" + new String(bytes, 0, read).trim());
socket.close();
}
}
}
2. I/O模型
2.1 I/O模型说明
2.2 BIO(同步并阻塞IO)
-
Java BIO就是传统的socket编程 -
BIO:同步阻塞,服务期实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池的机制改善(实现多个客户连接服务器) -
工作机制: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Si52tstD-1650104078257)(RPC框架设计与分布式理论.assets/image-20220329221451684.png)] -
生活中的例子: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oeg9TPDl-1650104078258)(RPC框架设计与分布式理论.assets/image-20220329221510841.png)] -
BIO问题分析:
- 每个请求都需要创建独立的线程,与对应的客户端进行数据Read、Write,以及业务的处理
- 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
- 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在Read操作上,造成线程资源浪费
2.3 NIO(同步非阻塞IO)
-
同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就处理 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0rJkHcIt-1650104078258)(RPC框架设计与分布式理论.assets/image-20220329223447687.png)] -
生活中的例子: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ncxGuD6M-1650104078258)(RPC框架设计与分布式理论.assets/image-20220329223502153.png)]
2.4 AIO(异步非阻塞)
-
AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
Proactor模式是一个消息异步通知的设计模式,Proactor通知的不是就绪事件,而是操作完成事件,这也就是操作系统异步IO的主要模型
-
生活中的例子: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KwzvtInC-1650104078259)(RPC框架设计与分布式理论.assets/image-20220329223800349.png)]
2.5 BIO、NIO、AIO使用场景分析
- BIO(同步并阻塞)方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易于理解
- NIO(同步非阻塞)方式适用于连接数目较多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持
- AIO(异步非阻塞)方式适用于连接数目多且连接比较长的(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持
二、NIO编程
1、NIO介绍
-
JavaNIO全称 java non-blocking IO,是指JDK提供的新API,从JDK1.4开始,Java提供了一系列改进的输入输出的新特性,被统称为NIO(即New IO),是同步非阻塞的 -
NIO有三个核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器) -
NIO是面向缓冲区编程的。数据读取到一个缓冲区中,需要时可在缓冲区前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络 -
JavaNIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个
2、NIO和BIO的比较
- BIO是以流的方式处理数据,而NIO以缓冲区的方式处理数据,缓冲区I/O的效率比流I/O高很多
- BIO是阻塞的,NIO是非阻塞的
- BIO基于字节流或字符流进行操作,而NIO基于Channel(通道)和Buffer(缓冲区)进行操作,数据总是从通道的读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
3、NIO三大核心原理示意图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eq4SnB6d-1650104078260)(RPC框架设计与分布式理论.assets/image-20220329230126315.png)]
- 每个Channel都会对应一个Buffer
- Selector对应一个线程,一个线程对应多个channel(连接)
- 每个channel都会注册到Selector选择器上
- Selector不断轮询查询Channel上的事件,事件是通道Channel非常重要的概念
- Selector会根据不同的事件,完成不同的处理操作
- Buffer就是内存块,底层就是一个数组
- 数据的读取写入是通过Buffer,这个和BIO,BIO中要么是输入流或者是输出流,不能双向,但是NIO的Buffer是可以读也可以写,channel是双向的
4、缓冲区
4.1 基本介绍
? 缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel提供从网络读取数据的渠道,但是读取或写入的数据都必须经由Buffer
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Usd5PBKh-1650104078261)(RPC框架设计与分布式理论.assets/image-20220329233413848.png)]
4.2 Buffer常用API介绍
- Buffer类及其子类
在NIO中,Buffer是一个顶级父类,它是一个抽象类,类的层级关系图,常用的缓冲区分别对应byte,short, int, long,float,double,char 7种
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Y8ndlT1j-1650104078261)(RPC框架设计与分布式理论.assets/image-20220329234904740.png)]
- 缓冲区对象创建
方法名 | 说明 |
---|
static ByteBuffer allocate(长度) | 创建byte类型的指定长度的缓冲区 | static ByteBuffer wrap(byte[] array) | 创建一个有内容的byte类型缓冲区 |
? 示例代码:
package online.yuanle.buffer;
import java.nio.ByteBuffer;
public class CreateBufferDemo {
public static void main(String[] args) {
ByteBuffer allocate = ByteBuffer.allocate(5);
for (int i = 0; i < 5; i++) {
System.out.println(allocate.get());
}
ByteBuffer wrap = ByteBuffer.wrap("yuanle".getBytes());
for (int i = 0; i < 6; i++) {
System.out.println(wrap.get());
}
}
}
- 缓冲区对象添加数据
方法名 | 说明 |
---|
int position/ position(int newPosition) | 获得当前要操作的索引/ 修改当前要操作的索引位置 | int limit/ limit(int newLimit) | 最多能操作的索引位置/ 修改最多能操作的索引位置 | int capacity() | 返回缓冲区总长度 | int remaining/ boolean hasRemaining() | 还有多少能操作索引个数/是否还能操作 | put(byte b)/ put(byte[] src) | 添加一个字节/添加字节数组 |
? 图解:
? [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-meJvMjsC-1650104078262)(RPC框架设计与分布式理论.assets/image-20220331212954776.png)]
? 示例代码:
package online.yuanle.buffer;
import java.nio.ByteBuffer;
public class PutBufferDemo {
public static void main(String[] args) {
ByteBuffer allocate = ByteBuffer.allocate(10);
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println(allocate.remaining());
System.out.println("-------------------------");
allocate.put((byte) 97);
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println(allocate.remaining());
System.out.println("----------------------------");
allocate.put("abc".getBytes());
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println(allocate.remaining());
System.out.println("-----------------------");
allocate.put("123456".getBytes());
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println(allocate.remaining());
System.out.println(allocate.hasRemaining());
System.out.println("--------------------");
allocate.position(0);
allocate.put("123".getBytes());
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println(allocate.remaining());
System.out.println(allocate.hasRemaining());
}
}
- 缓冲区对象读取数据
方法名 | 说明 |
---|
flip() | 写切换读模式,limit设置position位置,position设置值0 | get() | 读一个字节 | get(byte[] dst) | 读多个字节 | get(int index) | 读指定索引的字节 | rewind() | 将position设置为0,可以重复读 | clear() | 切换写模式,position设置为0,limit设置为capacity | array() | 将缓冲区转换成字节数组返回 |
? 图解:flip()方法
? [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yFadaTcR-1650104078263)(RPC框架设计与分布式理论.assets/image-20220331220137902.png)]
? 图解:clear()方法
? [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9kNqTmku-1650104078263)(RPC框架设计与分布式理论.assets/image-20220331220203144.png)]
? 示例代码:
package online.yuanle.buffer;
import java.nio.ByteBuffer;
public class GetBufferDemo {
public static void main(String[] args) {
ByteBuffer allocate = ByteBuffer.allocate(10);
allocate.put("0123".getBytes());
System.out.println(allocate.position());
System.out.println(allocate.limit());
System.out.println(allocate.capacity());
System.out.println(allocate.remaining());
System.out.println("======================");
allocate.flip();
for (int i = 0; i < allocate.limit(); i++){
System.out.println(allocate.get());
}
System.out.println(allocate.get(1));
allocate.rewind();
byte[] bytes = new byte[4];
allocate.get(bytes);
System.out.println(new String(bytes));
System.out.println("=================");
byte[] array = allocate.array();
System.out.println(new String(array));
allocate.clear();
allocate.put("abc".getBytes());
System.out.println(new String(array));
}
}
注意事项:
- capacity:容量limit:界限(最多能读/写到哪里),position:位置(读、写哪个索引)
- 获取缓冲区里面数据之前,需要调用flip方法
- 再次写数据之前,需要调用clear方法,但是数据还未消失,等再次写入数据,被覆盖了才会消失
5、通道(channel)
5.1 基本介绍
-
通常来说NIO中的所有IO都是从Channel(通道)开始的。NIO的通道类似于流,但有些区别如下:
- 通道可以读也可以写,流一般来说是单向的(只能读或者只能写,所以之前所有的IO操作的时候需要分别创建一个输入流和一个输出流)
- 通道可以异步读写
- 通道总是基于缓冲区Buffer来读写
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vvR9sh6t-1650104078263)(RPC框架设计与分布式理论.assets/image-20220331221152407.png)]
5.2 Channel常用类介绍
-
Channel接口,常用的实现类有:
- FileChannel:用户文件的数据读写
- DatagramChannel用于UDP的数据读写
- ServerSocketChannel和SocketChannel用于TCP的数据读写
ServerSocketChannel类似于ServerSocket,SocketChannel类似Socket,可以完成客户端与服务端的通信工作
5.3 ServerSocketChannel
-
服务端实现步骤:
-
打开一个服务端通道 -
绑定对应的端口号 -
通道默认是在阻塞的,需要设置为非阻塞 -
检查是否有客户端连接,有客户端连接会返回对应的通道 -
获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中 -
给客户端回写数据 -
释放资源 -
代码实现: package online.yuanle.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
System.out.println("服务端启动成功……");
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null){
System.out.println("没有客户端连接……我去做别的事情");
Thread.sleep(1000);
continue;
}
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
System.out.println("客户端消息:" + new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
socketChannel.write(ByteBuffer.wrap("没钱".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
}
5.4 SocketChannel
-
实现步骤:
- 打开通道
- 设置连接IP和端口号
- 写出数据
- 读取服务器写回的数据
- 释放资源
-
代码实现: package online.yuanle.channel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
socketChannel.write(ByteBuffer.wrap("老板还钱吧!".getBytes(StandardCharsets.UTF_8)));
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = socketChannel.read(allocate);
System.out.println("服务端消息" + new String(allocate.array(), 0, read, StandardCharsets.UTF_8));
socketChannel.close();
}
}
6、Selector(选择器)
6.1 基本介绍
- 可以用一个线程处理多个客户端的连接,就会使用到NIO的Selector(选择器),Selector能够检测多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mY6EyLUg-1650104078264)(RPC框架设计与分布式理论.assets/image-20220401231614466.png)]
在这种没有选择器的情况下,对应每个连接对应一个处理线程。但是连接并不能马上就会发送消息,所以还会产生资源浪费
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QjcvSfaO-1650104078264)(RPC框架设计与分布式理论.assets/image-20220401231725891.png)]
只有在通道真正有读写事件发生时,才会进行读写,就大大的减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,避免了多线程之间的上下文切换导致的开销
6.2 常用API介绍
-
Selector是一个抽象类 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iVGT95oL-1650104078264)(RPC框架设计与分布式理论.assets/image-20220401231935380.png)]
- Selector.open();//得到一个选择器对象
- Selector.select()://阻塞, 1000毫秒,监控所有注册的通道,当有对应的事件操作时,会将SelectKey放入集合内部并返回
- selector.selectedKeys()//返回存有SelectionKet的集合
-
SelectionKey [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-j03q9pjB-1650104078264)(RPC框架设计与分布式理论.assets/image-20220401232147876.png)]
- 常用方法
- SelectionKey.isAcceptable():是否是连接继续事件
- SelectionKey.isConnectable():是否是连接就绪事件
- SelectionKey.isReadable(): 是否是读就绪事件
- SelectionKey.isWritable(): 是否是写就绪事件
- SelectionKey中定义的4种事件
- SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
- SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功
- SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
- SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
6.3 Selector编码
-
服务器实现步骤
- 打开一个服务端通道
- 绑定对应的端口号
- 通道默认是阻塞的,需要设置为非阻塞
- 创建选择器
- 将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT
- 检查选择器是否有事件
- 获取事件集合
- 判断事件是否是客户端连接事件SelectionKey.isAcceptable()
- 得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ
- 判断是否是客户端读就绪事件SelectionKey.isReadable()
- 得到客户端通道,读取数据到缓冲区
- 给客户端回写数据
- 从集合中删除对应的事件, 因为防止二次处理
-
代码实现 package online.yuanle.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Set;
public class NIOSelectorServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务端启动成功……");
while (true) {
int select = selector.select(2000);
if (select == 0){
System.out.println("没有事件发生....");
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()){
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客户端连接......");
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
if (selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer allocate = ByteBuffer.allocate(1024);
int read = socketChannel.read(allocate);
if (read > 0) {
System.out.println("客户端消息:" + new String(allocate.array(), 0, read, StandardCharsets.UTF_8));
socketChannel.write(ByteBuffer.wrap("没钱".getBytes(StandardCharsets.UTF_8)));
socketChannel.close();
}
}
iterator.remove();
}
}
}
}
三、Netty核心原理
1、Netty介绍
1.1原生NIO存在的问题
-
NIO的类库和API繁杂,使用麻烦:需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等 -
需要具备其它的额外技能:要熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的NIO程序。 -
开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等 -
JDK NIO的 BUG:臭名昭著的 Epoll BUG,它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本该问题仍旧存在,没有被根本解决。
在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epollbug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。
1.2 Netty概述
- Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络IO程序。Netty是一个基于NIO的网络编程框架,使用Netty可以帮助我们快速、简单的开发出一个网络应用,相当于家安花了NIO的开发过程。作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的ELasticsearch、Dubbo框架内部都采用了Netty。
? [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QCYQojN0-1650104078265)(RPC框架设计与分布式理论.assets/image-20220406194712751.png)]
- 从图中可以看出Netty的强大之处:零拷贝、可拓展事件模型;支持TCP、UDP、HTTP、WebSocket等协议;提供安全传输、压缩、大文件传输、编解码支持等。
- 具备如下优点:
- 设计优雅,提供阻塞和非阻塞的Socket;提供灵活可拓展的事件模型;提供高度可定制的线程模型。
- 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。
- 提供安全传输特性
- 支持多种主流协议;预制多种编解码功能,支持用户开发私有协议。
2、线程模型
2.1 线程模型基本介绍
? 不同的线程模式,对程序的性能有很大影响
2.2 传统阻塞I/O服务模型
? 采用阻塞IO模式获取输入的数据,每个连接都需要独立的的线程完成数据的输入,业务处理和数据返回工作做
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xB0rCzOi-1650104078265)(RPC框架设计与分布式理论.assets/image-20220406220602862.png)]
存在问题:
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read操作,造成线程资源浪费
2.3 Reactor模型
? Reactor模式,通过一个或多个输入同时传递给服务处理器的模式,服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程,因此Reactor模式也叫Dispatcher模式,Reactor模式使用IO复用监听事件,收到事件后,分发给某个线程(进程),这就是网络服务器高并发处理关键
-
单Reactor单线程 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-6lMmWLgz-1650104078266)(RPC框架设计与分布式理论.assets/image-20220406221401872.png)]
- Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor对象通过Selector监控客户端请求事件,收到事件后通过Dispatch进行分发
- 是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理
- Handler会完成Read ----->业务处理------>的完整业务流程
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
- 性能问题:只有一个线程,无法完全发挥多核CPU的性能。Handler在处理某个连接上的业务时,整个进程无法处理其它连接事件,很容易导致性能瓶颈
- 可靠性问题:线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
-
单Reactor多线程 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wP0lVWgS-1650104078266)(RPC框架设计与分布式理论.assets/image-20220406222841315.png)]
- Reactor对象通过Selector监控客户端请求事件,收到事件后,通过dispatch进行分发
- 如果建立连接请求,则由Acceptor通过accept处理连接请求
- 如果不是连接请求,则由Reactor分发调用连接对应的Handler来处理
- handler只负责响应事件,不做具体的业务处理,通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务
- worker线程池会分配独立线程完成真正的业务,并将结果返回给handler
- handler收到响应后,通过send将结果返回给client
优点:可以充分的利用多核CPU的处理能力
缺点:多线程数据共享和访问比较复杂,Reactor处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈
-
主从Reactor多线程 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kADUCNsc-1650104078266)(RPC框架设计与分布式理论.assets/image-20220406223453111.png)]
- Reactor主线程MainReactor对象通过select监听客户端连接事件,收到事件后,通过Acceptor处理客户端连接事件
- 当Acceptor处理完客户端连接事件之后(与客户端建立好Socket连接),MainReactor将连接分配给SubReactor。(即:MainReactor只负责监听客户端连接请求,和客户端建立连接之后将连接交由SubReactor监听后面的IO事件)
- SubReactor将连接加入到自己的连接队列进行监听,并创建Handler对各种事件进行呢处理
- 当连接上有新事件发生的时候,SubReactor就会调用对应的Handler处理
- Handler通过read从连接上读取请求数据,将请求数据分发给worker线程池进行业务处理
- Worker线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给Handler。Handler通过send向客户端发送响应数据
- 一个MainReactor可以对应多个SubReactor,即一个MainReactor线程可以对应多个SubReactor线程
优点:
- MainReactor线程与SubReactor线程的数据交互简单职责明确,MainReactor线程只需要接收新连接,SubReactor线程完成后续的业务处理
- MainReactor线程与SubReactor线程的数据交互简单,MainReactor线程只需要把新连接传给SubReactor线程,SubReactor线程无需返回数据
- 多个SubReactor线程能够应对更高的并发请求
缺点:这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括Nginx、Memcached、Netty等。这种模式也被叫做服务器的1+M+N线程模式,即使用该模式开发的服务器包含一个(或多个,1只是表示相对较少)连接建立线程+M个IO线程+N个业务处理线程。这是业界成熟的服务器程序设计模式。
2.4 Netty线程模型
? Netty的主要设计基于主从Reactor多线程模式,并做了一定的改进。
-
简单版Netty模型 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sRlQk8XL-1650104078267)(RPC框架设计与分布式理论.assets/image-20220406232135371.png)]
- BossGroup线程维护Selector,ServerSocketChannel注册到这个Selector上,只关注连接建立请求事件(主Reactor)
- 当接收到来自客户端的连接建立请求事件的时候,通过ServerSocketChannel.accept方法获得对应的SocketChannel,并封装成NioSocketChannel注册到WorkerGroup线程中的Selector,每个Selector运行在一个线程中(从Reactor)
- 当WorkerGroup县城中的Selector坚听到自己感兴趣的IO事件后,就调用Handler进行处理
-
进阶版Netty模型 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-23kgpcvp-1650104078267)(RPC框架设计与分布式理论.assets/image-20220406232529268.png)]
- 有两组线程池:BossGroup和WorkerGroup,BossGroup中的线程专门负责和客户端建立连接,WorkerGroup中的线程专门负责处理连接上的读写
- BossGroup和WorkerGroup含有多个不断循环的执行事件处理的线程,每个线程都包含一个Selector,用于监听在其上的Channel
- 每个 BossGroup 中的线程循环执行以下三个步骤
- 轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- 处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到WorkerGroup 中某个线程上的 Selector 上
- 再去以此循环处理任务队列中的下一个事件
- 每个 WorkerGroup 中的线程循环执行以下三个步骤
- 轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- 在对应的 NioSocketChannel 上处理 read/write 事件
- 再去以此循环处理任务队列中的下一个事件
-
详细版Netty模型 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sVqpd2Y7-1650104078267)(RPC框架设计与分布式理论.assets/image-20220406233439894.png)]
- Netty抽象出两组线程池:BossGroup和WorkerGroup,也可以叫做BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup
- NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就是一个 NioEventLoop
- NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)
- NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop
- 每个 BossNioEventLoop 中循环执行以下三个步骤
- select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件)
- processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上
- runAllTasks:再去以此循环处理任务队列中的其他任务
- 每个 WorkerNioEventLoop 中循环执行以下三个步骤
- select:轮训注册在其上的 NioSocketChannel 的 read/write 事件(OP_READ/OP_WRITE 事件)
- processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件
- runAllTasks:再去以此循环处理任务队列中的其他任务
- 在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器(拦截处理器、过滤处理器、自定义处理器等)。
3、核心API介绍
3.1 ChannelHandler及其实现类
-
Channel接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具体的业务逻辑。API关系如下图所示: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TsEI63Pt-1650104078268)(RPC框架设计与分布式理论.assets/image-20220407223753458.png)] -
Netty开发中需要自定义一个Handler类去实现ChannelHandle接口或其子接口或其实现类,然后通过重写相应方法实现业务逻辑,一般需要重写:
- public void channelActive(ChannelHandlerContext ctx),通道就绪事件
- public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件
- public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事件
3.2 ChannelPipeline
-
ChannelPipeline是一个Handler的集合,它负责处理拦截inbound和outbound的事件和操作,相当于一个贯穿Netty的责任链 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SIzBEAlN-1650104078268)(RPC框架设计与分布式理论.assets/image-20220407224119620.png)] -
如果客户端和服务器的Handler是一样的,消息从客户端到服务端或者反过来,每个Inbound类型或Outbound类型的Handler只会经过一次,混合类型的Handler(实现了Inbound和Outbound的Handler)会经过两次。准确的说ChannelPipeline中是一个ChannelHandlerContext,每个上下文对象中有ChannelHandler. InboundHandler是按照Pipleline的加载顺序的顺序执行, OutboundHandler是按照Pipeline的加载顺序,逆序执行
3.3 ChannelHandlerContext
- 这是事件处理上下文对象,Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler ,同时ChannelHandlerContext 中也绑定了对应的 ChannelPipeline和 Channel 的信息,方便对ChannelHandler 进行调用。常用方法如下所示:
- ChannelFuture close(),关闭通道
- ChannelOutboundInvoker flush(),刷新
- ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前ChannelHandler 的下一个 ChannelHandler 开始处理(出站)
3.4 ChannelOption
- Netty 在创建 Channel 实例后,一般都需要设置 ChannelOption 参数。ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:
- ChannelOption.SO_BACKLOG:对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。
- ChannelOption.SO_KEEPALIVE:一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
3.5 ChannelFuture
- 表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的,I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。
- 常用方法如下所示:
- Channel channel(),返回当前正在进行 IO 操作的通道
- ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步
3.6 EventLoopGroup和实现类NioEventLoopGroup
-
EventLoopGroup 是一组 EventLoop 的抽象,Netty 为了更好的利用多核 CPU 资源,一般 会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。 -
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。 通常一个服务端口即一个 ServerSocketChannel对应一个Selector 和一个EventLoop线程。 BossEventLoop 负责接收客户端的连接并将SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示: [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LqZUq9Ns-1650104078268)(RPC框架设计与分布式理论.assets/image-20220407224753741.png)] -
BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。 -
一般情况下我们都是用实现类NioEventLoopGroup -
常用方法如下所示:
- public NioEventLoopGroup(),构造方法,创建线程组
- public Future<?> shutdownGracefully(),断开连接,关闭线程
3.7 ServerBootstrap和Bootstrap
- ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置;Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 该方法用于服务器端,用来设置两个 EventLoop
- public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop
- public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道 实现
- public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置
- public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通道添加配置
- public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处理类(自定义的 handler)
- public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号
- public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端
3.8 Unpooled类
这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:
- public static ByteBuf copiedBuffer(CharSequence string, Charset charset),通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)
4、Netty入门案例
Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
4.1 Netty服务端编写
-
服务端实现步骤:
- 创建bossGroup线程组:处理网络事件—连接事件
- 创建workerGroup线程组:处理网络事件—读写事件
- 创建服务端启动助手
- 设置bossGroup线程组和workerGroup线程组
- 设置服务端通道实现为NIO
- 设置参数
- 创建一个通道初始化对象
- 向pipeline中添加自定义业务处理handler
- 启动服务端并绑定端口,同时将异步改为同步
- 关闭通道和关闭连接池
-
代码实现:
- NettyServer
package online.yuanle.demo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(9999).sync();
System.out.println("服务端启动成功……");
future.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
- 自定义服务端handle
package online.yuanle.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyServerHandler implements ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
System.out.println("客户端发送过来的消息 = " + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext context) throws Exception {
context.writeAndFlush(Unpooled.copiedBuffer("你好,我是Netty服务端……", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
throwable.printStackTrace();
channelHandlerContext.close();
}
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
}
4.2 Netty客户端编写
-
客户端实现步骤:
- 创建线程组
- 创建客户端启动助手
- 设置线程组
- 设置客户端通道实现为NIO
- 创建一个通道初始化对象
- 向pipeline中添加自定义业务处理handler
- 启动客户端,等待连接服务端,同时将异步改为同步
- 关闭通道和关闭连接池
-
代码实现
- NettyClient
package online.yuanle.demo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();
channelFuture.channel().closeFuture().sync();
group.shutdownGracefully();
}
}
- 自定义客户端业务处理
package online.yuanle.demo;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.util.CharsetUtil;
public class NettyClientHandler implements ChannelInboundHandler {
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好呀我是Netty客户端", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
ByteBuf byteBuf = (ByteBuf) o;
System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
}
}
5、Netty异步模型
5.1 基本介绍
-
异步的概念和同步相对,当一个一步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TN1J6110-1650104078269)(RPC框架设计与分布式理论.assets/image-20220411184133456.png)] -
Netty中的IO操作都是异步的,包括Bing、Write、Connect等操作会简单的返回一个ChannelFuture。调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或通过机制获得IO操作结果。Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适,那么可以在调用fun得到时候,立马返回一个Future,后续可以通过Future去监控fun方法的处理过程(即:Future-Listener 机制)
5.2 Future和Future-listener
-
Future
- 表示异步执行结果,可以通过它提供的方法来检测执行是否完成,ChannelFuture是他的一个子接口。ChannelFuture是一个接口,可以添加监听器,当监听的事件发生时,就会通知到监听器
- 当Future对象刚刚被创建时,出于非完成状态,调用者可以通过返回的ChannelFuture来获取操作的执行状态,注册监听函数来执行完成后的操作。
- 常用方法:
- sync 方法, 阻塞等待程序结果反回
- isDone 方法来判断当前操作是否完成;
- isSuccess 方法来判断已完成的当前操作是否成功;
- getCause 方法来获取已完成的当前操作失败的原因;
- isCancelled 方法来判断已完成的当前操作是否被取消;
- addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果Future 对象已完成,则通知指定的监听器
-
Future-Listener机制
- 给Future添加监听器,监听操作及结果,代码实现
ChannelFuture future = serverBootstrap.bind(9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("端口绑定成功!");
}else {
System.out.println("绑定失败");
}
}
});
System.out.println("服务端启动成功……");
ChannelFuture future = channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("你好呀我是Netty客户端", CharsetUtil.UTF_8));
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("成功");
}else {
System.out.println("失败");
}
}
});
四、Netty高级应用
1、Netty编解码器
1.1 Java的编解码
-
编码(Encode)称为序列化,它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。 -
解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4jl6QZfx-1650104078269)(RPC框架设计与分布式理论.assets/image-20220411195742650.png)] -
Java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过 java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化。 -
Java序列化目的:1.网络传输。2.对象持久化 -
Java序列化缺点:1.无法跨语言。 2.序列化后码流太大。3.序列化性能太低。 -
Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框架,这些编解码框架实现消息的高效序列化。
4.2 Netty编解码器
-
概念
- 在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换,网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码。
- 对于Netty而言,编解码器由两部分组成:编码器、解码器
- 解码器:负责将消息从字节或其它序列形式转成指定的消息对象。
- 编码器:将消息对象转成字节或其它序列形式在网络上传输。
- Netty的编(解)码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖于ChannelPipeline,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。
- Netty里面的编解码: 解码器:负责处理入站InboundHandler数据。 编码器:负责出站OutboundHandler数据
-
解码器(Decoder)
-
解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据是抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中。对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ceXDHyXu-1650104078270)(RPC框架设计与分布式理论.assets/image-20220411205614341.png)] -
抽象解码器
- ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节
- ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
- MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)
-
核心方法:
-
代码实现:
package online.yuanle.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.util.CharsetUtil;
import java.util.List;
public class MessageDecoder extends MessageToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
System.out.println("正在进行消息解码……");
ByteBuf byteBuf = (ByteBuf) msg;
list.add(byteBuf.toString(CharsetUtil.UTF_8));
}
}
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
System.out.println("客户端发送过来的消息 = " + o);
}
channel.pipeline().addLast("messageDecoder", new MessageDecoder());
channel.pipeline().addLast(new NettyServerHandler());
-
编码器(Encoder)
-
与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都实现ChannelOutboundHandler接口。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gjy8OoDF-1650104078270)(RPC框架设计与分布式理论.assets/image-20220411210412961.png)] -
抽象编码器
- MessageToByteEncoder: 将消息转化成字节
- MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO)
-
抽象方法
-
代码实现:
package online.yuanle.codec;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.util.CharsetUtil;
import java.util.List;
public class MessageEncoder extends MessageToMessageEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
System.out.println("消息正在进行编码……");
String str = (String) msg;
list.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
}
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
ChannelFuture future = channelHandlerContext.writeAndFlush("你好呀我是Netty客户端");
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("成功");
}else {
System.out.println("失败");
}
}
});
}
channel.pipeline().addLast("messageDecoder", new MessageDecoder());
channel.pipeline().addLast("messageEncoder", new MessageEncoder());
channel.pipeline().addLast(new NettyServerHandler());
-
编码解码器Codec
-
编码解码器: 同时具有编码与解码功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MVbDwJ2u-1650104078270)(RPC框架设计与分布式理论.assets/image-20220411210836655.png)] -
Netty提供提供了一个ChannelDuplexHandler适配器类,编码解码器的抽象基类ByteToMessageCodec ,MessageToMessageCodec都继承与此类 -
代码实现: package online.yuanle.codec;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.CharsetUtil;
import java.util.List;
public class MessageCodec extends MessageToMessageCodec {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
System.out.println("消息正在进行编码……");
String str = (String) msg;
list.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, Object msg, List list) throws Exception {
System.out.println("正在进行消息解码……");
ByteBuf byteBuf = (ByteBuf) msg;
list.add(byteBuf.toString(CharsetUtil.UTF_8));
}
}
-
启动类:
channel.pipeline().addLast("messageCodec", new MessageCodec());
channel.pipeline().addLast(new NettyServerHandler());
2、Netty案例-群聊天室
案例需求:
- 编写一个Netty群聊系统,实现服务端和客户端之间的简单通讯
- 实现多人聊天
- 服务端:可以监测用户上线离线,并实现消息转发功能
- 客户端:可以发送消息给其它所有用户,同时可以接收其它用户发送的消息
2.1 聊天室服务端编写
-
NettyChatServer package online.yuanle.chat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class NettyChatServer {
private int port;
public NettyChatServer(int port) {
this.port = port;
}
public void run(){
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new StringEncoder());
channel.pipeline().addLast(new NettyChatServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("端口绑定成功!");
}else {
System.out.println("绑定失败");
}
}
});
System.out.println("聊天室服务端启动成功……");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyChatServer nettyChatServer = new NettyChatServer(9998);
nettyChatServer.run();
}
}
-
NettyChatServerHandler package online.yuanle.chat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.ArrayList;
import java.util.List;
public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {
public static List<Channel> channelList = new ArrayList<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelList.add(channel);
System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "在线……");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelList.remove(channel);
System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "下线……");
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
Channel channel = channelHandlerContext.channel();
channelList.forEach(channel1 -> {
if (channel != channel1){
channel1.writeAndFlush("["+channel.remoteAddress().toString().substring(1)+"]说:" + msg);
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
cause.printStackTrace();
channelList.remove(channel);
System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "异常……");
}
}
2.2 聊天室客户端编写
- NettyChatClient
package online.yuanle.chat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import online.yuanle.demo.NettyClientHandler;
import java.util.Scanner;
public class NettyChatClient {
private String ip;
private int port;
public NettyChatClient(String ip, int port) {
this.ip = ip;
this.port = port;
}
public void run() {
EventLoopGroup group = null;
try {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new StringEncoder());
channel.pipeline().addLast(new NettyChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
Channel channel = channelFuture.channel();
System.out.println("----"+channel.localAddress().toString().substring(1)+"-----");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String msg = scanner.nextLine();
channel.writeAndFlush(msg);
}
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyChatClient nettyChatClient = new NettyChatClient("127.0.0.1", 9998);
nettyChatClient.run();
}
}
- NettyChatClientHandler
package online.yuanle.chat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg);
}
}
3、基于Netty的HTTP服务器开发
3.1 介绍
- Netty的HTTP协议栈无论在性能还是可靠性上,都表现优异,非常适合在非Web容器的场景下应用,相比于传统的Tomcat、Jetty等Web容器,它更加轻量和小巧,灵活性和定制性也更好。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jAtrGyPz-1650104078271)(RPC框架设计与分布式理论.assets/image-20220416131628619.png)]
3.2 功能需求
- Netty服务器在8080端口监听
- 浏览器发出请求
- 服务器可以回复消息给客户端,并对特定资源进行过滤
3.3 服务端代码实现
-
NettyHttpServer package online.yuanle.http;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import online.yuanle.chat.NettyChatServer;
import online.yuanle.chat.NettyChatServerHandler;
public class NettyHttpServer {
private int port;
public NettyHttpServer(int port) {
this.port = port;
}
public void run(){
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
try {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new HttpServerCodec());
channel.pipeline().addLast(new NettyHttpServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()){
System.out.println("端口绑定成功!");
}else {
System.out.println("绑定失败");
}
}
});
System.out.println("HTTP服务端启动成功……");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyHttpServer nettyHttpServer = new NettyHttpServer(8080);
nettyHttpServer.run();
}
}
-
NettyHttpServerHandler package online.yuanle.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg;
System.out.println("浏览器请求路径:" + request.uri());
if ("/favicon.ico".equals(request.uri())){
System.out.println("图标不响应");
return;
}
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello!我是Netty服务器", CharsetUtil.UTF_8);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(response);
}
}
}
(); workerGroup.shutdownGracefully(); } }
public static void main(String[] args) {
NettyHttpServer nettyHttpServer = new NettyHttpServer(8080);
nettyHttpServer.run();
}
}
2. NettyHttpServerHandler
```java
package online.yuanle.http;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
/**
* @author long chen
* @date 2022-04-16 11:50
* @description HTTP服务器处理类
*/
public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
/**
* 读取就绪事件
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
//1.判断请求是不是HTTP请求
if (msg instanceof HttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg;
System.out.println("浏览器请求路径:" + request.uri());
if ("/favicon.ico".equals(request.uri())){
System.out.println("图标不响应");
return;
}
//2.给浏览器进行响应
ByteBuf byteBuf = Unpooled.copiedBuffer("Hello!我是Netty服务器", CharsetUtil.UTF_8);
DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
//设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
//设置响应长度
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
channelHandlerContext.writeAndFlush(response);
}
}
}
|