1、BIO(Blocking IO)
1.1 介绍
BIO是同步阻塞IO,当有客户端连接进来时,需要开辟一个线程给这个客户端,所以当连接过高时,会对服务器造成负载过高甚至卡死,
1.2 Java代码示例
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newCachedThreadPool();
ServerSocket ss = new ServerSocket(8888);
while (true) {
// 阻塞,等待客户端连接
Socket client = ss.accept();
// 连接成功后开辟一个线程
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "成功建立连接....");
InputStream inputStream;
try {
inputStream = client.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
// 循环接收客户端发来的消息
while(true){
// 阻塞,等待客户端发送消息
String s = br.readLine();
if(null == s) {
client.close();
System.out.print(Thread.currentThread().getName() + "客户端断开连接...");
break;
}
System.out.println(Thread.currentThread().getName() + "客户端发来消息:" + s);
}
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
2、NIO(Non-Blocking IO)
2.1 介绍
NIO是同步非阻塞IO,在等待客户端连接时,会有状态返回,状态代表有连接还是没连接,相比BIO,NIO不需要对每个客户端开辟一个线程进行通信管理,只需要一个线程就能实现的IO模型。在少量的网络连接中NIO可能是个优势,但在大量的网络连接场景中,由于每一次询问channel是否有消息读取时都是一次系统调用,这会造成频繁的用户态到内核态的切换,并且并不是每一个连接都会发生消息,这会导致系统资源占用过多,这就是C10K的问题
2.2 Java代码示例
public static void main(String[] args) throws IOException {
LinkedList<SocketChannel> clientList = new LinkedList<>();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(9999));
// 设置不阻塞,不设置的话,没有连接就会阻塞,设置为false时没有连接就会返回null
ssc.configureBlocking(false);
while (true) {
SocketChannel accept = ssc.accept();
if(accept != null) {
// 等待客户端发送消息为不阻塞
accept.configureBlocking(false);
clientList.add(accept);
System.out.println("客户端" + accept.socket().getPort() + "连接成功");
}
// 设置缓冲区容量,接收的对象超过容量就不能再写入了
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
for(SocketChannel sc : clientList) {
// 如果不设置accept.configureBlocking(false) 此处会阻塞
int read = sc.read(byteBuffer);
if(read > 0) {
// 翻转缓冲区,在缓冲区写入数据时,缓冲区的position会一直加一,要读取数据则需要将position放到0号位置
byteBuffer.flip();
byte[] dataByte = new byte[byteBuffer.limit()];
byteBuffer.get(dataByte);
String data = new String(dataByte);
System.out.println("客户端" + sc.socket().getPort() + "发送消息:" + data);
} else if(read < 0) {
clientList.remove(sc);
System.out.println("客户端" + sc.socket().getPort() + "断开连接");
}
}
}
}
3、AIO
3.1 介绍
AIO是异步非阻塞IO,监听客户端连接请求和消息发送都是基于回调,IO操作充分依赖于操作系统,所以在不同的操作系统或不同的操作系统的内核版本中,性能也不同,在Netty中曾经支持AIO,但后来去除了,因此实际运用中并不广泛,这里不过多介绍
4、多路复用器
4.1 介绍
多路复用器是同步非阻塞,每次有客户端连接时,将fd维护在一个容器中,要获取需要读写的客户端时,只需要将容器中的fd传入内核中完成遍历即可,只需要一次系统调用,有3种实现方式:select、poll、epoll,适用于有大量的网络连接的场景中
4.2 select介绍
select的指令函数如下
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
nfds:检测前多少个文件描述符,一般是文件描述符最大值+1,因为文件描述符是从0开始
readfds:监控有读数据到达文件描述符的集合
writefds:监控有写数据到达文件描述符的集合
exceptfds:监控有异常的文件描述符集合
timeout:定时阻塞时间,不设置就一直阻塞
其中fd_set最大只能监听1024个fd
4.3 poll介绍
poll与select相比实现方式差不多,但是去掉了监听数量的限制,但同样带来一个问题。每次都需要对所有的fd进行一次线性遍历,时间复杂度为O(n),但并不是所有的fd都有读写数据
4.4 epoll介绍
在Java中默认使用epoll作为多路复用器,epoll的流程:
- 先调用epoll_create创建一个红黑树空间并返回一个文件描述符fd6
- 当有连接进来时调用epoll_ctl将连接的文件描述符与fd6关联起来
- 内核通过中断把有事件的文件描述符copy到一个链表中
- 调用epoll_wait获取存放有事件的链表,这个过程无需传入文件描述符,也无需遍历
4.5 Java代码示例
public static void main(String[] args) throws IOException, InterruptedException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(1000));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
// 不传值时会一直阻塞
while (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
if(sk.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) sk.channel();
SocketChannel client = channel.accept();
client.configureBlocking(false);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
client.register(selector, SelectionKey.OP_READ, byteBuffer);
System.out.println("客户端" + client.socket().getPort() + "连接成功");
} else if(sk.isReadable()) {
new Thread(() -> {
SocketChannel channel = (SocketChannel) sk.channel();
ByteBuffer byteBuffer = (ByteBuffer) sk.attachment();
try {
int read = channel.read(byteBuffer);
if (read > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.limit()];
byteBuffer.get(bytes);
byteBuffer.clear();
System.out.println("客户端" + channel.socket().getPort() + "发送消息:" + new String(bytes));
} else if(read < 0) {
sk.cancel();
channel.close();
System.out.println("客户端" + channel.socket().getPort() + "断开连接");
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(2000);
}
}
}
}
|