目前很多高性能的Java RPC框架都是基于Netty实现的,而Netty的设计原理又离不开Java NIO。本篇笔记是对NIO核心三件套:缓冲区(Buffer)、选择器 (Selector)和通道(Channel),其中后两者选择器与通道实现原理的学习总结。
一、NIO聊天室入门案例
在学习原理之前,先来了解一个Java NIO实现聊天室的小案例,该案例只有三个类:NioServer 聊天室服务端、NioClient 聊天室客户端、ClientThread 客户端线程。
服务端代码:
public class NioServer {
Map<String, SocketChannel> memberChannels;
private static final int PORT = 8080;
private Selector selector;
private ServerSocketChannel server;
private ByteBuffer buffer;
public NioServer() throws IOException {
this.selector = Selector.open();
this.server = getServerChannel(selector);
this.buffer = ByteBuffer.allocate(1024);
memberChannels = new ConcurrentHashMap<>();
}
private ServerSocketChannel getServerChannel(Selector selector) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
return serverSocketChannel;
}
public void listen() throws IOException {
System.out.println("服务端启动......");
try {
while (true) {
int count = selector.select();
if (count == 0) {
continue;
}
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = keySet.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端连接:"
+ channel.socket().getInetAddress().getHostName() + ":"
+ channel.socket().getPort());
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
buffer.clear();
channel.read(buffer);
buffer.flip();
String msg = decoder.decode(buffer).toString();
if (!"".equals(msg)) {
System.out.println("收到:" + msg);
if (msg.startsWith("username=")) {
String username = msg.replaceAll("username=", "");
memberChannels.put(username, channel);
System.out.println("用户总数:" + memberChannels.size());
} else {
String[] arr = msg.split(":");
if (arr.length == 3) {
String from = arr[0];
String to = arr[1];
String content = arr[2];
System.out.println(from + "发送给" + to + "的消息:" + content);
if (memberChannels.containsKey(to)) {
CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
memberChannels.get(to).write(encoder.encode(CharBuffer.wrap(from + ":" + content)));
}
}
}
}
}
}
}
}catch (Exception e){
System.out.println("服务端启动失败......");
e.printStackTrace();
}finally {
try {
selector.close();
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new NioServer().listen();
}
}
客户端线程类:
public class ClientThread extends Thread {
private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
private CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
private Selector selector = null;
private SocketChannel socket = null;
private SelectionKey clientKey = null;
private String username;
public ClientThread(String username) {
try {
selector = Selector.open();
socket = SocketChannel.open();
socket.configureBlocking(false);
clientKey = socket.register(selector, SelectionKey.OP_CONNECT);
InetSocketAddress ip = new InetSocketAddress("localhost", 8080);
socket.connect(ip);
this.username = username;
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while (true) {
selector.select();
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending())
channel.finishConnect();
channel.register(selector, SelectionKey.OP_READ);
System.out.println("连接服务器端成功!");
send("username=" + this.username);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(50);
channel.read(buffer);
buffer.flip();
String msg = decoder.decode(buffer).toString();
System.out.println("收到:" + msg);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
selector.close();
socket.close();
} catch (IOException e) {
}
}
}
public void send(String msg) {
try {
SocketChannel client = (SocketChannel) clientKey.channel();
client.write(encoder.encode(CharBuffer.wrap(msg)));
} catch (Exception e) {
e.printStackTrace();
}
}
public void close() {
try {
selector.close();
socket.close();
} catch (IOException e) {
}
}
}
客户端代码:
public class NioClient {
public static void main(String[] args) {
String username = "lufei";
ClientThread client = new ClientThread(username);
client.start();
BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));
try {
String readline;
while ((readline = sin.readLine()) != null) {
if (readline.equals("bye")) {
client.close();
System.exit(0);
}
client.send(username + ":" + readline);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行测试:
启动运行测试一下效果!
服务端先启动,控制台打印:
服务端启动......
接着启动客户端,控制台打印:
连接服务器端成功!
这时候服务端会打印客户端的连接信息以及用户名等信息:
测试客户端向服务的发送消息,客户端控制台输入Hello 我是lufei! ,这时候服务端会收到发送过来的消息内容:
我们可以再建立一个客户端启动类NioClient2,并将其启动,服务端会收到客户端2的消息:
让客户端1和客户端2之间发送消息:
服务端控制台打印:
这样,一个简单的聊天室就搭建成功了,如果小伙伴想自行完善,可以把代码拷贝一下,自己去设计自己想要实现的聊天室功能。
熟悉了NIO通信的小案例之后,我们通过一张图来分析一下其实现原理:
从图中可以看出,当有读或写等任何注册的事件发生时,可以从 Selector中获得相应的SelectionKey,同时从SelectionKey中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。
二、Selector 选择器
1、Selector 继承体系
NIO中实现非阻塞 I/O的核心对象是Selector,Selector是注册各种I/O事件的地方,而且当那些事件发生时,就是Seleetor告诉我们所发生的事件。
使用NIO中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:
- (1) 向Selector对象注册感兴趣的事件。
- (2) 从Selector中获取感兴趣的事件。
- (3) 根据不同的事件进行相应的处理。
2、Selector 选择器的创建
在聊天室案例的NioServer服务端类中,选择器的初始化创建位于其构造函数中:
public NioServer() throws IOException {
this.selector = Selector.open();
}
Selector 可以通过它自己的open() 方法创建,借助java.nio.channels.spi.SelectorProvider 类创建一个新的 Selector 选择器。也可以通过实现java.nio.channels.spi.SelectorProvider 类的抽象方法openSelector() 来自定义实现一个Selector。Selector 一旦创建将会一直处于 open 状态直到调用了close() 方法为止。
我们跟进这个open() 方法:
public abstract class Selector implements Closeable {
protected Selector() {}
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
...
}
继续向下跟进SelectorProvider.provider() 方法:
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
我们跟进DefaultSelectorProvider.create() 方法,它会根据不同的操作系统去创建不同的SelectorProvider:
- 如果是Windows操作系统,则创建的是WindowsSelectorProvider对象。
- 如果是MacOS操作系统,则创建的是KQueueSelectorProvider对象。
- 如果是Linux操作系统,则创建的是EPollSelectorProvider对象。
例如我使用的是Mac系统,那么跟进create() 方法时,进入如下代码:
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
return new KQueueSelectorProvider();
}
}
public class KQueueSelectorProvider extends SelectorProviderImpl {
public KQueueSelectorProvider() {
}
public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this);
}
}
继续跟进,进入KQueueSelectorImpl类内部:
class KQueueSelectorImpl extends SelectorImpl {
protected int fd0;
protected int fd1;
......
KQueueSelectorImpl(SelectorProvider var1) {
super(var1);
long var2 = IOUtil.makePipe(false);
this.fd0 = (int)(var2 >>> 32);
this.fd1 = (int)var2;
......
}
......
}
问题:为什么在不同操作系统平台,Provider不同呢?
因为网络IO是跟操作系统息息相关的,不同的操作系统的实现可能都不一样。比如我们在Linux操作系统安装的JDK版本,和Windows操作系统上就不太一样。
3、Selector 选择器绑定 Channel 管道
如聊天室案例中服务端 NioServer,Channel 管道与 Selector 选择器的绑定通过如下方式:
private ServerSocketChannel getServerChannel(Selector selector) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
return serverSocketChannel;
}
注意:Channel必须是非阻塞模式才能注册到Selector上,所以,无法将一个FileChannel注册到Selector,因为FileChannel没有所谓的阻塞还是非阻塞模式。
管道 Channel 和 选择器 Selector 的关系:
- Selector 通过不断轮询的方式同时监听多个 Channel 的事件,注意,这里是同时监听,一旦有 Channel 准备好了,它就会返回这些准备好了的 Channel,交给处理线程去处理。
- 在NIO编程中,通过 Selector 我们就实现了一个线程同时处理多个连接请求的目标,也可以一定程序降低服务器资源的消耗。
4、管道与选择器之间的桥梁 SelectionKey
我们再来看一下 Selector 与 Channel 的关系图:
如上图所示,将管道与注册到选择器上时,register() 方法需要传递2个参数,一个是 Selector 选择器对象,另一个是管道中事件的类型 SelectionKey,选择器通过该对象的静态变量值去识别不同管道中的事件内容。所以SelectionKey 又可以看作是Channel和Selector之间的一座桥梁,把两者绑定在了一起。
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey 有如下4种事件类型:
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
5、SelectionKey 的几个常用方法
参考文章:https://juejin.cn/post/6844903440573792270
5.1、interestOps()方法
- 作用:返回代表需要Selector监控的IO操作的,可以通过以下方法来判断Selector是否对Channel的某种事件感兴趣。
- interest数据集:当前Channel感兴趣的操作,此类操作将会在下一次选择器
select() 操作时被交付,可以通过selectionKey.interestOps(int) 进行方法修改。 - 使用方式如下:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept =
(interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
5.2、readyOps()方法
- 作用:获取此selectionKey键上的ready操作集合,即在当前通道上已经就绪的事件。可以通过
readyOps() 方法获取所有就绪了的事件,也可以通过isXxxable() 方法检查某个事件是否就绪。 - ready数据集:表示此选择键上,已经就绪的操作,每次
select() 时,选择器都会对ready集合进行更新,外部程序无法修改此集合。 - 使用方式如下:
int readySet = selectionKey.readyOps();
boolean isReadable();
boolean isWritable():
boolean isConnectable():
boolean isAcceptable():
5.3、channel()、selector()方法
- 作用:通过
channel()、selector() 方法可以获取绑定的通道Channel和选择器Selector,代码案例如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
5.4、attachment()方法
可以将一个或者多个附加对象绑定到SelectionKey上,以便容易的识别给定的通道。通常有两种方式:
SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);
selectionKey.attach(theObject);
绑定之后,可通过对应的SelectionKey取出该对象:
selectionKey.attachment();
如果要取消该对象,则可以通过该种方式:
selectionKey.attach(null)
需要注意的是如果附加的对象不再使用,一定要人为清除,因为垃圾回收器不会回收该对象,若不清除的话会成内存泄漏。
一个单独的通道可被注册到多个选择器中,有些时候我们需要通过isRegistered() 方法来检查一个通道是否已经被注册到任何一个选择器上。 通常来说,我们并不会这么做。
有点类似于ThreadLocal,可以让不同的线程都拥有一份自己的变量副本,且相互隔离,但是区别在于ThreadLocal的对象过期后会被自动回收的!
6、Selector 的几个常用方法
这一部分参考自这 2 篇文章:
6.1、select()方法
一旦将一个或多个Channel注册到Selector上了,我们就可以调用它的select() 方法了,它会返回注册时感兴趣的事件中就绪的事件。
select() 方法有三种变体:
select() ,无参数,阻塞到至少有一个通道在你注册的事件上就绪了才返回。(当然是我们注册的感兴趣的事件)。select(timeout) ,带超时,阻塞直到某个Channel有就绪的事件了,或者超时了才返回。selectNow() ,立即返回,非阻塞,只要有通道就绪就立刻返回。
select() 方法返回的 int 值表示有多少通道已经就绪,是自上次调用select() 方法后有多少通道变成就绪状态。之前在select() 调用时进入就绪的通道不会在本次调用中被记入,而在前一次select() 调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select() 方法,如果有一个通道变成就绪状态,返回了1,若再次调用select() 方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的Channel没有做任何操作,现在就有两个就绪的通道,但在每次select() 方法调用之前,只有一个通道就绪了。
一旦调用select() 方法,并且返回值不为 0 时,则可以通过调用Selector的selectedKeys() 方法来访问已选择键集合。如下:
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
6.2、selectKeys()方法
Selector.selectKeys() ,可以获取该选择器相关联的SelectionKey集合,通过遍历这些SelectorKey,可以进一步获取其感情兴趣的事件类型,以及其关联的Channel通道。
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
} else if (key.isConnectable()) {
} else if (key.isReadable()) {
} else if (key.isWritable()) {
}
it.remove();
}
最后,一定要记得调用it.remove(); 移除已经处理的 SelectionKey。
6.3、wakeUp()方法
前面我们说了调用select() 方法时,调用者线程会进入阻塞状态,直到有就绪的 Channel 才会返回。其实也不一定,wakeup() 就是用来破坏规则的,可以在另外一个线程调用wakeup() 方法强行唤醒这个阻塞的线程,这样select() 方法也会立即返回。
如果调用wakeup() 时并没有线程阻塞在select() 上,那么,下一次调用select() 将立即返回,不会进入阻塞状态。这跟LockSupport.unpark() 方法是比较类似的。
6.4、close()方法
调用 close() 方法将会关闭 Selector,同时也会将关联的 SelectionKey 失效,但不会关闭 Channel。
三、Channel 通道
1、Channel 继承体系
通道是一个对象,通过它可以读取和写入数据,当然所有数据都通过Buffer对象来处理。我们永远不会将字节直接写入通道,而是将数据写入包含一个或者多个字节的缓冲区。同样也不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
使用NIO读取数据可以分为下面三个步骤:
- (1)从FileInputStream获取Channel。
- (2)创建Buffer。
- (3)将数据从Channel读取到Buffer中。
使用NIO写入数据同样分为三个步骤:
- (1)从FileInputStream获取Channel。
- (2)创建Buffer。
- (3)将数据从Channel写入Buffer。
2、Channel 与 Steam 流的区别
参考自:https://blog.csdn.net/tangtong1/article/details/103341597
BIO是面向流(Stream)编程的,流又分成 InputStream 和 OutputStream ,那么 Channel 和 Stream 有什么区别呢?
- Channel 可以同时支持读和写,而 Stream 只能支持单向的读或写(所以分成InputStream和OutputStream)。
- Channel支持异步读写,Stream通常只支持同步。
- Channel总是读向(read into)Buffer,或者写自(write from)Buffer(有点绕,以 Channel 为中心,从 Channel 中读出数据到 Buffer,从 Buffer 中往 Channel 写入数据),可以参考如下代码:
将 Channel 管道中的数据读取到 Buffer:
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(50);
channel.read(buffer);
buffer.flip();
String msg = decoder.decode(buffer).toString();
System.out.println("收到:" + msg);
}
将数据通过 Buffer 写入到 Channel 管道:
public void send(String msg) {
try {
SocketChannel client = (SocketChannel) clientKey.channel();
client.write(encoder.encode(CharBuffer.wrap(msg)));
} catch (Exception e) {
e.printStackTrace();
}
}
由此可知,管道中的数据传输、从管道中读取/写入数据,数据本身都是需要包装再 Buffer 缓冲中的!
资料参考:
|