????????以前大家都是用阻塞式IO来对网络IO进行数据请求,对于不同的IO都要分配一个线程来处理,如果没有数据就会进行等待,从而造成了阻塞,这种方式极大地浪费了资源(如图1)。于是,有人就提出了一个想法,使用一个线程去监控多个IO请求,如果哪一个IO数据准备完毕后就通知相应的线程来处理,这就是selector模型(如图2)。而Java中的selector就是对selector模型的一种实现,用于询问选择已就绪的IO处理任务。
Selector的几个核心的概念
- Channel(通道):用于进行网络传输的通道,网络传输的数据都放在通道中,可以进行写入,也可以进行读取。Channel主要有两种:ServerSocketChannel和SocketChannel,其中ServerSocketChannel是用于服务端开发的,而SocketChannel是用于客户端开发的。
- Selector(选择器):用于进行监控多个通道数据状态。
- SelectableChannel(可选择通道):可以被选择器选择的通道,继承了抽象类SelectableChannel的Channel,而FileChannel没有继承此类,所以不可以被选择器选择。
- SelectionKey(选择键):用于表示通道可以被选择的某种就绪事件状态。选择键的事件主要有以下几种:
OP_READ :可读事件。 ??OP_WRITE :可写事件。 ??OP_CONNECT :客户端连接服务端的事件,一般为创建SocketChannel 客户端channel。 ??OP_ACCEPT :服务端接收客户端连接的事件,一般为创建ServerSocketChannel 服务端channel。
Selector的使用
1.创建选择器
Selector selector = Selector.open();
2.获取通道
ServerSocketChannel channel = ServerSocketChannel.open(); // 创建通道
channel.bind(new InetSocketAddress(8080)); // 绑定端口
channel.configureBlocking(false); // 设置为非阻塞,注册到selector上的通道一定设置为非阻塞,否则会报IllegalBlockingModeException错误
3.将通道注册到选择器上
channel.register(selector, SelectionKey.OP_ACCEPT); // 将通道注册到选择器上,监听可接收事件,对于监听多个事件可以用“按位或”来操作
4.轮询已就绪的事件,并对不同的事件进行处理
while (true) {
int count = selector.select(); // 获取已就绪事件的数量
if (count == 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取已就绪键集
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isAcceptable()) {
// 接收处理
} else if (key.isConnectable()) {
// 连接处理
} else if (key.isReadable()) {
// 读取处理
} else if (key.isWritable()) {
// 写入处理
}
key.remove(); // 移除键,防止下次重复处理
}
}
// 注意:key.isWritable只要建立连接,就会触发,所以在设置可写入事件时,在写入之后要改回可监听事件,否则就会死循环
其他关于Selector的知识点
Selector内部总共维护了三组键集:
keys :当前Channel注册在Selector上面的所有的key,可以调用keys() 获取。
selectedKeys :当前Channel所有已就绪的事件,可以调用selectedKeys() 获取。
cancelledKeys :当前Channel所有已取消的事件,主动调用cancel() 方法的事件会放在该集合。
其他一些常用的方法:
Selector#isOpen() :判断selector是否是open状态,如果调用了close() 方法则会返回false 。SelectionKey#isValid() :判断选择键是否有效。Selector#selectNow() :获取是否有就绪的事件,该方法立即返回结果,不会阻塞。Selector#select(long timeout) :在超时时间内,有就绪事件时才会返回,其次超过时间也会返回。Selector#select() :阻塞直到有事件就绪时才会返回Selector#wakeup() :调用该方法会时,阻塞在select() 处的线程会立即返回。即使当前不存在线程阻塞在select() 处,那么下一个select() 方法也会立即返回。Selector#close() :用完Selector 后调用其close() 方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效。
一个Selector的简单示例
/**
仅服务端,客户端可以使用telnet命令
**/
@Test
public void testSelector() throws IOException, InterruptedException {
ServerSocketChannel channel = ServerSocketChannel.open(); // 创建通道
channel.bind(new InetSocketAddress(8080)); // 绑定端口号
channel.configureBlocking(false); // 设置为非阻塞
Selector selector = Selector.open(); // 创建选择器
channel.register(selector, SelectionKey.OP_ACCEPT); // 注册到选择器上
Thread thread = new Thread(() -> {
try {
while (true) {
int count = selector.select(); // 获取已就绪事件数量
if (count == 0) {
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 获取已就绪键集
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (!key.isValid()) { // 判断选择键是否有效
continue;
} else if (key.isAcceptable()) { // 处理接收事件
SocketChannel socket = channel.accept();
socket.configureBlocking(false);
socket.register(selector, SelectionKey.OP_READ);
System.out.println("已注册" + socket);
} else if (key.isReadable()) { // 处理读取事件
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 声明buffer
socketChannel.read(byteBuffer); // 将通道中的数据读取到buffer
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String str = new String(bytes);
str = "\r\nreceive: " + str;
System.out.println(str);
socketChannel.write(ByteBuffer.wrap(str.getBytes())); // 将数据写回通道中
}
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
|