前言
此篇博文主要根据尚硅谷的视频课程把使用Netty需要了解的知识进行整理并根据自己的理解加以补充。由于内容较多所以将笔记分为多个部分,同时更新速度可能会比较慢,希望读者多包涵。
Netty 介绍
- Netty 是由
JBOSS 提供的一个Java 开源框架,现为Github上的独立项目。 Netty 是一个异步的,基于事件驱动 的网络应用框架,用一快速开发高性能、高可靠 的网络IO程序。- Netty 主要针对
TCP协议 下,面向Clients端高并发应用,或者Peer-to-Peer(简称P2P)场景下的大量数据持续传输的应用。 - Netty
本质 是一个NIO框架,使用与服务器通信相关的多种应用场景。
Netty 使用场景
互联网行业
- 在分布式系统中,各个节点之间需要
远程服务调用 ,高性能的RPC框架必不可少,Netty作为异步高性能通信框架,往往作为基础通信组件 被这些RPC框架使用。 - 典型的应用有:阿里分布式服务框架
Dubbo 的RPC框架,使用协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各个进程节点之间的内部通信。
游戏行业
- 无论是手游服务端还是大型网络游戏Java语言得到了越来越广的应用。
- Netty作为高性能的基础通信组件,提供了TCP/UDP和HTTP协议栈,方便定制和开发私有协议栈,账号登陆服务器。
- 地图服务器之间可以方便的通过Netty进行高性能的通信。
大数据领域
- 经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信。
- 它的 NettyService 基于 Netty 框架二次封装实现。
Java IO模型
IO模型
-
I/O模型的简单理解:就是用什么样的通道进行数据发送和接受,很大程度上决定了程序的通信性能。 -
Java 支持3种网络编程模型:BIO/NIO/AIO -
Java BIO :同步且阻塞 ,服务器实现模式是一个连接一个线程,即客户端有连接请求是服务端就需要启一个线程来进行处理,如果这个连接一直不处理会造成不必要的线程开销。 -
Java Nio :同步非阻塞,服务器实现模式为一个线程处理多个连接请求,即客户端发送的连接请求 会注册 到多路复 用器上,多路复用器轮询到连接有I/O请求进行处理。 -
Java AIO(NIO.2) :异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
BIO,NIO,AIO场景分析比较
- BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解。
NIO 方式适用于 连接数目多且连接比较短(轻操作) 的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。AIO 方式使用于连接数目多且连接比较长(重操作) 的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
Java BIO 编程
Java BIO介绍
- Java BIO 就是传统的 Java I/O 编程,其相关的类和接口在 java.io。
- BIO(BlockingI/O):同步阻塞,服务器实现模式为一个连接一个线程, 即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。
- BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,程序简单易理解。
Java BIO工作机制
BIO工作机制梳理
- 服务器端启动一个
ServerSocket 。 - 客户端启动
Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯。 - 客户端
发出请求 后,先咨询服务器 是否有线程响应,如果没有则会等待,或者被拒绝。如果有响应,客户端线程会等待请求结束后,再继续执行。
Java BIO编程实例
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
ExecutorService executorService = new ThreadPoolExecutor(4,8,0L,TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>());
ServerSocket serverSocket = new ServerSocket(12345);
log.info("服务启动了");
while (true){
Socket socket = serverSocket.accept();
log.info("有客户端连接了");
executorService.execute(new Runnable() {
@Override
public void run() {
byte[] bytes = new byte[1024];
try {
while (true){
InputStream inputStream = socket.getInputStream();
int read = inputStream.read(bytes);
if(read!=-1){
System.out.println(new String(bytes,0,read));
}else {
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}finally {
System.out.println("服务端关闭");
}
}
});
}
}
}
客户端采用telnet:
问题分析
- 每个请求都需要创建独立的线程,与对应的客户端进行数据Read、Write。
- 当并发数较大时需要创建大量线程来处理连接,系统资源占用较大。
- 连接建立后,如果当前线程暂时没有数据可读,则线程阻塞在Read操作上,造成资源浪费。
Java NIO 编程
Java NIO介绍
- Java NIO 全称
Java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 NewIO),是同步非阻塞 的。 - NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。
- NIO 有三大核心部分:
Channel(通道)、Buffer(缓冲区)、Selector(选择器) 。 - NIO 是
面向缓冲区,或者面向块编程 的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式 的高伸缩性网络。 - Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。
- 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。
HTTP 2.0 使用了多路复用 的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP 1.1 大了好几个数量级。
NIO 与 BIO比较
BIO 以流的方式处理数据 ,而 NIO 以块 的方式处理数据,块 I/O 的效率比流 I/O 高很多。BIO 是阻塞 的,NIO 则是非阻塞 的。BIO 基于字节流和字符流进行操作 ,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作 ,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。```Selector````(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道。- Buffer和Channel之间的
数据流向是双向 的
NIO三大核心原理图
NIO 的 Selector、Channel 和、Buffer 之间关系
三大核心关系解读
1. 每个 Channel 都会对应一个 Buffer。 2. Selector 对应一个线程,一个线程对应多个 Channel(连接)。 3. 该图反应了有三个 Channel 注册到该 Selector //程序 4. 程序切换到哪个 Channel 是由事件决定的,Event 就是一个重要的概念。 5. Selector 会根据不同的事件,在各个通道上切换。 6. Buffer 就是一个内存块,底层是有一个数组。 7. 数据的读取写入是通过 Buffer,这个和 BIO是不同的,BIO 中要么是输入流,或者是输出流,不能双向,但是 NIO 的 Buffer 是可以读也可以写,需要 flip 方法切换 Channel 是双向的,可以返回底层操作系统的情况,比如 Linux,底层的操作系统通道就是双向的。
缓冲区Buffer
基本介绍
缓冲区(Buffer): ??缓冲区本质上是一个可以读写数据的内存块,Jdk1.8里面是这么说的A container for data of a specific primitive type也可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer。
Buffer及其子类
1.在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类,类的层级关系图:
2.Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:
基本属性: capacity, limit, and position
??初始化元素状态:新建的buffer的position始终是0,mark是未被标记,limit是0也可以是其他值这依赖于buffer的类型和使用方式
A newly-created buffer always has a position of zero and a mark that is
undefined. The initial limit may be zero, or it may be some other value
that depends upon the type of the buffer and the manner in which it is
constructed. Each element of a newly-allocated buffer is initialized
to zero.
Clearing, flipping, and rewinding
除了访问position, limit, capacity,mark,reset之外,此buffer类也定义了如下操作: clear() :为一个新的通道读取序列或相对于put操作准备好缓冲区:limit,capacity,position都设为0 flip() :为一个新的通道写入序列或相对于get操作准备好缓冲区:此时limit 设置为当前的position ,position设置为0 rewind() :为重复读取它已经包含的数据准备一个缓冲区。此时limit不变 ,position设置为0
线程安全(Thread safety)
- 当
多线程 访问时缓冲区不是线程安全 ,如果要使用多线程访问缓冲区则适当使用同步控制 - 允许链式调用:
b.flip().position(23).limit(42);
Channel(通道)
channel基本介绍
- NIO 的通道类似于流,但有些区别如下:
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲:
- BIO 中的 Stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。
- Channel 在 NIO 中是一个接口 public interface Channel extends Closeable{}
常用的 Channel 类有:FileChannel、DatagramChannel、ServerSocketChannel 和 SocketChannel。【ServerSocketChanne 类似 ServerSocket、SocketChannel 类似 Socket】 FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。
FileChannel 类
FileChannel 主要用来对本地文件进行 IO 操作,常见的方法有: public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中 public int write(ByteBuffer src) ,把缓冲区的数据写到通道中 public long transferFrom(ReadableByteChannel src, long position, long count) ,从目标通道中复制数据到当前通道 public long transferTo(long position, long count, WritableByteChannel target) ,把数据从当前通道复制给目标通道
FileChannel 向本地文件写入数据:实例一
public class FIleServer {
public static void main(String[] args) {
try {
FileOutputStream fileOutputStream = new FileOutputStream("d:\\filechannel.txt");
FileChannel fileChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。".getBytes());
byteBuffer.flip();
fileChannel.write(byteBuffer);
fileOutputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
FileChannel 向本地文件写入数据:实例二
public class FileInputServer {
public static void main(String[] args) {
String pathname = "d:\\filechannel.txt";
File file = new File(pathname);
try {
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel fileChannel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
fileInputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
FileChannel 向本地文件写入数据:实例三
实例说明: 使用一个 Buffer 完成文件读取、写入 从filechannel.txt中读取数据。 将读取的数据写到file03.txt。 实例演示。
public class FileChannel03 {
public static void main(String[] args) {
try {
String pathname = "d:\\filechannel.txt";
File file = new File(pathname);
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel channel1 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file03.txt");
FileChannel channel2 = fileOutputStream.getChannel();
ByteBuffer allocate = ByteBuffer.allocate((int)file.length());
while(true){
allocate.clear();
int read = channel1.read(allocate);
if(read==-1){
break;
}
allocate.flip();
channel2.write(allocate);
}
fileOutputStream.close();
fileInputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
FileChannel 向本地文件写入数据:实例四
使用 FileChannel(通道)和方法 transferFrom,完成文件的拷贝 实例演示
public class FileChannel04 {
public static void main(String[] args) {
try {
String pathname = "D:\\images\\buffer.jpg";
File file = new File(pathname);
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel channel1 = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("d:\\buffer.jpg");
FileChannel channel2 = fileOutputStream.getChannel();
channel2.transferFrom(channel1,0,channel1.size());
fileOutputStream.close();
fileInputStream.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
重点方法说明:
- **
transferFrom **是一个抽象方法,主要是将字节从给定的可读字节传输到此通道的文件中。
public abstract long transferFrom(ReadableByteChannel src,long position, long count)throws IOException; ReadableByteChannel继承Channel,一个可读取字节的通道(A channel that can read bytes)。
- <p> Only one read operation upon a readable channel may be in progress at
- any given time. If one thread initiates a read operation upon a channel
- then any other thread that attempts to initiate another read operation will
- block until the first operation is complete. Whether or not other kinds of
- I/O operations may proceed concurrently with a read operation depends upon
- the type of the channel. </p>
- 任何时间可读信道上只能进行一次读取操作。
- 如果一个线程在通道上启动读操作,则尝试启动另一个读操作的任何其他线程将阻塞,直到第一个操作完成。
- 其他类型的I / O操作是否可以与读操作同时进行取决于通道的类型。
Buffer 和 Channel的附加说明
- **
Buffer **存数据和取数据类型要一致。 - Buffer可以转成**
只读 **Buffer。 - 可以使用**
MappedByteBuffer **让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由 NIO 来完成。 - Nio支持多个**
Buffer **来完成数据读写,即 Scattering 和 Gathering 。见ServerSocketChannel实例演示
ServerSocketChannel 实例演示
public class ScatteringAndGathering05 {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 1234);
ServerSocket socket = serverSocketChannel.socket();
socket.bind(inetSocketAddress);
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
SocketChannel accept = serverSocketChannel.accept();
int mesLength = 8;
while (true){
int readLen = 0;
while (readLen < mesLength){
long read = accept.read(byteBuffers);
readLen += read;
System.out.println("readLen = " + readLen);
Arrays.asList(byteBuffers).stream().map(buffer -> "position = " + buffer.position() + ", limit = " + buffer.limit()).forEach(System.out::println);
}
Arrays.asList(byteBuffers).forEach(v->v.flip());
long byteWirte = 0;
while (byteWirte < mesLength) {
long l = accept.write(byteBuffers);
byteWirte += l;
}
Arrays.asList(byteBuffers).forEach(buffer -> {
buffer.clear();
});
System.out.println("readLen = " + readLen + ", byteWrite = " + byteWirte + ", mesLength = " + mesLength);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
|