缓冲区buffer、Channel通道、 selector选择器
1 buffer基本介绍
缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer.
常用Buffer子类一览
- ByteBuffer,存储字节数据到缓冲区
- shortBuffer,存储字符串数据到缓冲区
- CharBuffer,存储字符数据到缓冲区
- lntBuffer,存储整数数据到缓冲区
- LongBuffer,存储长整型数据到缓冲区
- DoubleBuffer,存储小数到缓冲区
- FloatBuffer,存储小数到缓冲区
import java.nio.IntBuffer;
public class BasicBuffer {
public static void main(String[] args) {
IntBuffer intBuffer = IntBuffer.allocate(5);
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i*2);
}
intBuffer.flip();
while(intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
}
}
2 Channel基本介绍
- NIO的通道类似于流,但有些区别如下:
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲;
-
BIO中的stream是单向的,例如 FilelnputStream对象只能进行读取数据的操作,而NIO中的通道(Channel)是双向的,可以读操作,也可以写操作。 -
Channel在NIO中是一个接口public interface Channel extends Closeable -
常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel【ServerSocketChannel类似ServerSocket,SocketChannel类似Socket】。 -
FileChannel用于文件的数据读写,DatagramChannel用于UDP的数据读写,ServerSocketChannel和SocketChannel用于TCP的数据读写。
通道(Channel)FileChannel类 Filehannel主要用来对本地文件进行IO操作,常见的方法有
-
public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中 -
public int write(ByteBuffer src) ,把缓冲区的数据写到通道中 -
public long transferFrom(ReadableByteChannel src, long position, long count) ,从目标通道中复制数据到当前通道 -
public long transferTo(long position, long count, WritableByteChannel target) ,把数据从当前通道复制给目标通道
package personal.netty.nio;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NioFileChannelTest1 {
public static void main(String[] args) throws IOException {
String str="hello world!";
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file1.txt");
FileChannel fileChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(str.getBytes());
byteBuffer.flip();
fileChannel.write(byteBuffer);
fileOutputStream.close();
System.out.println("存储数据结束");
}
}
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NioFileChannelTest2 {
public static void main(String[] args) throws IOException {
File file = new File("D:\\tool\\downloads\\test.txt");
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel fileChannel = fileInputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate((int) file.length());
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(),"GBK"));
fileInputStream.close();
}
}
package personal.netty.nio;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class NioFileChannelTest3 {
public static void main(String[] args) throws IOException {
FileInputStream fileInputStream = new FileInputStream("d:\\file1.txt");
FileChannel inputStreamChannel = fileInputStream.getChannel();
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file2.txt");
FileChannel outputStreamChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true){
byteBuffer.clear();
int read = inputStreamChannel.read(byteBuffer);
System.out.println("read:"+read);
if(read==-1){
break;
}
byteBuffer.flip();
outputStreamChannel.write(byteBuffer);
}
fileInputStream.close();
fileOutputStream.close();
}
}
3 buffer和channel的注意事项
- ByteBuffer 支持类型化的put 和 get, put放入的是什么数据类型,get就应该使用相应的数据类型来取出,否则可能有BufferUnderflowException 异常。
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.putInt(10);
buffer.putLong(10000);
buffer.putChar('是');
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
-
可以将一个普通Buffer转成只读Buffer -
NIO还提供了MappedByteBuffer,可以让文件直接在内存(堆外的内存)中进行修改,而如何同步到文件由NIO来完成。
package personal.netty.nio;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
public class MapperByteBufferTest {
public static void main(String[] args) throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("d:\\file1.txt", "rw");
FileChannel channel = randomAccessFile.getChannel();
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
mappedByteBuffer.put(0, (byte) 'q');
mappedByteBuffer.put(3, (byte) 'l');
randomAccessFile.close();
System.out.println("修改结束");
}
}
- 前面的读写操作,都是通过一个Buffer完成的,NIO还支持通过多个Buffer(即 Buffer数组)完成读写操作,即Scattering 和Gatering
package personal.netty.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
public class ScatteringAndGatheringTest {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(7001);
serverSocketChannel.socket().bind(inetSocketAddress);
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0]=ByteBuffer.allocate(5);
byteBuffers[1]=ByteBuffer.allocate(3);
SocketChannel socketChannel = serverSocketChannel.accept();
int messageLength=8;
System.out.println("启动成功");
while (true) {
int byteRead=0;
while (byteRead<messageLength){
long l = socketChannel.read(byteBuffers);
byteRead+=l;
System.out.println("byteRead="+byteRead);
Arrays.asList(byteBuffers).stream()
.map(buffer->"position="+buffer.position()+",limit="+buffer.limit())
.forEach(System.out::println);
}
Arrays.asList(byteBuffers).forEach(buffer->buffer.flip());
long byteWrite=0;
while (byteWrite<messageLength){
long write = socketChannel.write(byteBuffers);
byteWrite+=write;
}
Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.clear());
System.out.println("byteRead:="+byteRead+"; byteWrite:="+byteWrite);
}
}
}
4 selector选择器
4.1 Selector
-
Java的 NIO,用非阻塞的I0方式。可以用一个线程,处理多个的客户端连接,就会使用到selector(选择器) -
Selector能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。 -
只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程 -
避免了多线程之间的上下文切换导致的开销
4.2 NioEventLoop
-
Netty的IO线程NioEventLoop聚合了Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。 -
当线程从某客户端socket通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。 -
线程通常将非阻塞I/O的空闲时间用于在其他通道上执行I/O操作,所以单独的线程可以管理多个输入和输出通道。 -
由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁l/O阻塞导致的线程挂起。 -
一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程的问题,模型架构的性能、弹性伸缩能力和可靠性得到了提升。
- 当客户端连接时,会通过ServerSocketChannel得到SocketChannel
- Selector进行监听select 方法,返回有事件发生的通道的个数
- 将socketChannel注册到Selector上,register(Selector sel, int ops),一个selector上可以注册多个SocketChannel
- 注册后返回一个 SelectionKey,会和该Selector关联(集合)
- 进一 步得到各个SelectionKey (有事件发生)
- 在通过 SelectionKey反向获取SocketChannel,方法channel()
- 可以通过 得到的channel ,完成业务处理
4.3 Selector例子
编写一个NIO的入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
package personal.netty.nio.case1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NioServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress endpoint = new InetSocketAddress(6666);
serverSocketChannel.socket().bind(endpoint);
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
if(selector.select(1000)==0){
System.out.println("服务端等待了1秒,无连接");
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
if(key.isAcceptable()){
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("客户端连接成功,生成了一个socketChannel:"+socketChannel.hashCode());
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
channel.read(buffer);
System.out.println("从客户端接收的数据是:"+new String(buffer.array()));
}
keyIterator.remove();
}
}
}
}
package personal.netty.nio.case1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class NioClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
if(!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
}
}
String str="hello world! 冲冲冲";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
socketChannel.write(buffer);
System.in.read();
}
}
SelectionKey, 表示Selector和网络通道的注册关系,共四种:
- int OP_ ACCEPT: 有新的网络连接可以accept,值为16
- int OP_ CONNECT: 代表连接已经建立,值为8
- int OP_ WRITE: 代表写操作,值为4
- int OP_ READ:代表读操作,值为1
源码中:
public static final int OP_ READ=1<< 0;
public static final int OP_ WRITE =1<<2;
public static final int OP_ CONNECT =1<< 3;
public static final int OP_ ACCEPT=1<<4;
参考:https://www.bilibili.com/video/BV1DJ411m7NR?p=116&spm_id_from=pageDriver
|