IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 【专栏】RPC系列(番外)-“土气”的IO实现 -> 正文阅读

[网络协议]【专栏】RPC系列(番外)-“土气”的IO实现

公众号【离心计划】,掌握一手文章,一起离开地球表面

【RPC系列合集】

【专栏】RPC系列(理论)-夜的第一章

【专栏】RPC系列(理论)-协议与序列化

【专栏】RPC系列(理论)-动态代理

【专栏】RPC系列(实战)-摸清RPC骨架

【专栏】RPC系列(实战)-优雅的序列化

【专栏】RPC系列(番外)-IO模型与线程模型

前言

? ? 前一章番外篇中我们讲解了几种IO模型和线程模型的区别,这一节主要我们动手实现一下从BIO到NIO的过程,实际感受一下。

BIO

单线程BIO


public static void main(String[] args) {
    byte[] buffer = new byte[1024];
    try {
        ServerSocket serverSocket = new ServerSocket();
        System.out.println("绑定端口:8080");
        serverSocket.bind(new InetSocketAddress(8080));
        System.out.println("等待连接");
        while (true) {
            //阻塞点1:建立连接
            Socket accept = serverSocket.accept();
            System.out.println("-------------------------");
            System.out.println("建立连接成功,读取数据");
            //阻塞点2:等待数据
            accept.getInputStream().read(buffer);
            String data = new String(buffer);
            System.out.println("接收到数据:" + data);
            System.out.println("发送数据...");
            accept.getOutputStream().write("我收到数据啦".getBytes(StandardCharsets.UTF_8));
            System.out.println("发送完成");
            System.out.println("-------------------------");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

? ? 这是单线程阻塞IO的实现的Server,上面有两个阻塞点:accept和read。这也是我们在番外篇中理解的阻塞IO,由于没有连接的到来或者没有IO事件的到来就会阻塞在这里,然后我们关注一点ServerSocket的包路径:package java.net,并不是在NIO包下。

? ? 这种单线程阻塞模型的问题很明显,只能连接一个客户端,其他的连接都会被阻塞。

多线程BIO

? ? 基于上面的问题,由于单线程导致的并发连接阻塞问题,那么我们给每个连接分配一个线程去处理不就行了?按照这个思路,代码就是这样


public static void main(String[] args) {
  byte[] buffer = new byte[1024];
  try {
      ServerSocket serverSocket = new ServerSocket();
      System.out.println("绑定端口:8080");
      serverSocket.bind(new InetSocketAddress(8080));
      System.out.println("等待连接");
      while (true) {
          Socket accept = serverSocket.accept();
          //连接到了后分配一个线程处理这个连接后续的io操作
          Thread thread = new Thread(new Runnable() {
              @Override
              public void run()
              {
                  long threadId = Thread.currentThread().getId();
                  System.out.println("-------------------------");
                  System.out.printf("[%s] 建立连接成功\n", threadId);
                  while (true){
                      try {
                          int read = accept.getInputStream().read(buffer);
                          if(read>0){
                              String data = new String(buffer);
                              System.out.println(String.format("[%s] 接收到数据:", threadId) + data);
                              System.out.println("发送数据...");
                              accept.getOutputStream().write(String.format("Hello!I have received your message:%s",data).getBytes(StandardCharsets.UTF_8));
                          }else{
                              accept.close();
                              break;
                          }
                      } catch (IOException e) {
                          e.printStackTrace();
                          break;
                      }
                      System.out.println("发送完成");
                      System.out.println("-------------------------");
                  }
              }
          });
            thread.start();
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

? ? 这样的做法在大量连接出现的情况下就会导致C10K问题,也就是服务器创建线程数量的上限引起的瓶颈,通常为了不浪费资源会使用线程池来做线程公用,但是存在两个基本问题:

  1. 线程模型问题。线程池中线程数量够还好,不够还是得阻塞。

  2. IO模型问题,accept这一步的阻塞,一定会导致并发连接出现等待的问题(再短也是等待)。read这一步的阻塞,依旧会导致大量线程的切换。

NIO

? ? NIO与BIO的区别在于,accept与read不再阻塞。从阻塞的定义来看:调用方不等待被调用方的真实返回,所以是成立的。


public static void main(String[] args) {
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    try {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        System.out.println("绑定端口:8080");
        serverSocketChannel.bind(new InetSocketAddress(8080));
        serverSocketChannel.configureBlocking(false);
        System.out.println("等待连接");
        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                socketChannel.configureBlocking(false);
                System.out.println("-------------------------");
                System.out.println("建立连接成功,读取数据");
                int hasData = socketChannel.read(buffer);
                if (hasData != 0) {
                    String data = new String(buffer.array());
                    System.out.println("接收到数据:" + data);
                    System.out.println("发送数据...");
                    //重置buffer
                    buffer.flip();
                    buffer.put("我拿到数据啦".getBytes(StandardCharsets.UTF_8));
                    socketChannel.write(buffer);
                    System.out.println("发送完成");
                    System.out.println("-------------------------");
                } else {
                    System.out.println("还没数据啊");
                }
            } else {
                System.out.println("还没有客户端连接,但是不阻塞");
                Thread.sleep(2000);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

? ? 我们看ServerSocketChannel的包路径在java.nio下面,说明这是nio的socket实现。当我们调用了accept时无论有无连接都会直接返回,这就是nio与bio的区别所在,没有连接就会返回null。

? ? 这个是单线程的NIO模式,问题其实可以想象到,客户端A建立了连接后,客户端B也马上建立了连接,虽然他们之间不会阻塞了,但是Server还没等AB发送数据就已经下一次循环了,Server根本没记住A和B的连接。

NIO

? ? 要记住每个连接,必然要把连接存储下来,每个连接建立后,放到一个socketList里面,然后有连接进来就放到这个list中,每次判断后都隐形list的循环,看看其中的每个socket是否都有数据进来


publicstatic void main(String[] args) throws InterruptedException {
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     List<SocketChannel> socketList = newArrayList<SocketChannel>();
     try{
         //Java为非阻塞设置的类
         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
         serverSocketChannel.bind(newInetSocketAddress(8080));
         //设置为非阻塞
         serverSocketChannel.configureBlocking(false);
         while(true) {
             SocketChannel socketChannel = serverSocketChannel.accept();
             if(socketChannel==null) {
                 //表示没人连接
                 System.out.println("还没连接");
                 Thread.sleep(5000);
             }else{
                 System.out.println("建立连接成功");
                 socketList.add(socketChannel);
             }
             for(SocketChannel socket:socketList) {
                 socket.configureBlocking(false);
                 int effective = socket.read(byteBuffer);
                 if(effective!=0) {
                         byteBuffer.flip();//切换模式  写-->读
                         String content = Charset.forName("UTF-8").decode(byteBuffer).toString();
                         System.out.println("接收到消息:"+content);
                         byteBuffer.clear();
                 }else{
                         System.out.println("没收到消息呢");
                 }
             }
         }
       } catch(IOException e) {
             e.printStackTrace();
     }
}

? ? 其实这种“将连接暂存起来”的做法,就是IO多路复用的做法,上面我们看起来很挫的代码体现的也是复用的思想,即一个对外的socket可以同时保持多个连接,并且还能通过轮询的方式监听每个连接有没有数据。说到轮询,问题又来了,这边的for循环其实是同步的,也就意味着如果队列第一个接收的数据特别大,那么就会导致第二个连接的数据读取延迟,越往后延迟越严重,这倒是有点http队头阻塞的感觉,解决这个问题的办法其实在于使用正确的线程模型,也就是在上一篇番外篇中提到的Reactor模型,让处理连接和处理IO事件的线程分离,所以IO复用+Reactor正是最后的大招。

? ? 在引出代码之前我们还需要回想一个问题:IO复用+Reactor的模式可以使用BIO么?BIO是阻塞的,accept和read都阻塞,那么拿accept来说,我们根本不可能有下面这段代码,因为阻塞被唤醒后必然已经是有连接进来了,read操作亦是如此,所以显然这个问题的答案是不能。

if(socketChannel==null) {
   //表示没人连接
   System.out.println("还没连接");
   Thread.sleep(5000);
}else{
   System.out.println("建立连接成功");
   socketList.add(socketChannel);
}

IO复用+Reactor

? ? 这一块我们主要复现前一篇番外篇(再贴一下【专栏】RPC系列(番外)-IO模型与线程模型)中所说的三种Reactor线程模型,直观地感受一下,首先是单线程地Reactor

Reactor单线程

  1. 创建监听的channel

  2. 创建selector,并注册channel的连接事件到selector上

  3. 调用selector.select()阻塞等待连接

  4. 分流连接channel的连接、可读、可写事件分别处理

public class Server {
    private Selector selector;
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

    String str;
    public void start() throws IOException {
        // 创建channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 服务器配置为非阻塞
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress("localhost", 8001));
        // 创建selector
        selector = Selector.open();
        // 将channel注册到selector并监听连接事件
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while (!Thread.currentThread().isInterrupted()) {
            //这一步是阻塞的
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
                keyIterator.remove();
            }
        }
    }

    private void write(SelectionKey key) throws IOException, ClosedChannelException {
        SocketChannel channel = (SocketChannel) key.channel();
        System.out.println("write:"+str);

        sendBuffer.clear();
        sendBuffer.put(str.getBytes());
        //转成读模式,重置position
        sendBuffer.flip();
        channel.write(sendBuffer);
        channel.register(selector, SelectionKey.OP_READ);
    }

    private void read(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        this.readBuffer.clear();
        int numRead;
        try {
            numRead = socketChannel.read(this.readBuffer);
        } catch (IOException e) {
            key.cancel();
            socketChannel.close();
            return;
        }
        str = new String(readBuffer.array(), 0, numRead);
        System.out.println(str);
        socketChannel.register(selector, SelectionKey.OP_WRITE);
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = ssc.accept();
        clientChannel.configureBlocking(false);
        //将这个连接channel注册到selector上
        clientChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("[New Client Connected]"+clientChannel.getRemoteAddress());
    }

    public static void main(String[] args) throws IOException {
        System.out.println("Server started...");
        new Server().start();
    }
}

? ? 这里代码我们没有用复杂的Reactor模型,可以看到处理读写事件与select阻塞是一个线程,要想变成多线程Reactor模型,其实只要创建一个多线程worker线程池,在select后把处理IO事件的逻辑交给线程池处理就行了。

单Reactor多线程Reactor

? ? 为了不浪费篇幅,省去了部分去上面重复地代码。我们创建了一个线程池(按照规范不建议这样创建,这里为了方便)并接收IO任务、


public class ServerMulti {
    private Selector selector;
    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
    private ExecutorService executorService = Executors.newFixedThreadPool(50);
    String str;

    public void start() throws IOException {
        // 创建channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 服务器配置为非阻塞
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress("localhost", 8001));

        // 通过open()方法找到Selector
        selector = Selector.open();
        // 注册连接事件到selector
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while (!Thread.currentThread().isInterrupted()) {
            //这一步是阻塞的
            selector.select();
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> keyIterator = keys.iterator();
            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                if (!key.isValid()) {
                    continue;
                }
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    executorService.submit(() -> {
                        try {
                            read(key);
                            write(key);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                }
                keyIterator.remove();
            }
        }
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        String data = "Hello Client";
        System.out.println("write:" + data);

        sendBuffer.clear();
        sendBuffer.put(data.getBytes());
        //转成读模式,重置position
        sendBuffer.flip();
        channel.write(sendBuffer);
        channel.register(selector, SelectionKey.OP_READ);
    }
}

主从Reactor多线程Reactor

? ? 主从Reactor其实是将Reactor拆分,主Reactor只负责获取连接,从Reactor负责监听连接事件的IO事件并分配给工作线程处理对应的IO事件,为了节省篇幅去除了与上面重复的方法。

public class MasterSlaveServer {
    static ExecutorService followerExecutor = Executors.newFixedThreadPool(10);
    static ExecutorService workerExecutor = Executors.newFixedThreadPool(10);

    static class MasterReactor {
        private Selector selector;
        int slaveNum = 4;
        private List<SlaveReactor> slaveReactors = new ArrayList<>(slaveNum);

        public void start() throws IOException {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(8001));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //初始化slaveReactor
            for (int i = 0; i < slaveNum; i++) {
                SlaveReactor reactor = new SlaveReactor();
                slaveReactors.add(reactor);
                reactor.run();
            }
            int index = 0;
            while (!Thread.currentThread().isInterrupted()) {
                selector.select();
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = keys.iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    if (key.isValid() && key.isAcceptable()) {
                        //为新连接创建channel并分配给slaveReactor
                        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                        SocketChannel clientChannel = ssc.accept();
                        clientChannel.configureBlocking(false);
                        SlaveReactor slaveReactor = slaveReactors.get(index++ % slaveNum);
                        slaveReactor.registerChannel(clientChannel);
                    }
                    keyIterator.remove();
                }
            }
        }
    }

    static class SlaveReactor {
        private Selector selector;

        public SlaveReactor() throws IOException {
            selector = Selector.open();
            run();
        }

        //注册连接channel到当前SlaveReactor
        public void registerChannel(SocketChannel socketChannel) throws ClosedChannelException {
            socketChannel.register(selector, SelectionKey.OP_READ);
        }

        public void run() {
            followerExecutor.submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        //这一步是阻塞的
                        selector.select(500);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = keys.iterator();
                    while (keyIterator.hasNext()) {
                        SelectionKey key = keyIterator.next();
                        if (!key.isValid()) {
                            continue;
                        }
                        if (key.isReadable()) {
                            workerExecutor.submit(() -> {
                                try {
                                    read(key);
                                    write(key);
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            });
                        }
                        keyIterator.remove();
                    }
                }
            });
        }
    }
    public static void main(String[] args) throws IOException {
        MasterReactor masterReactor = new MasterReactor();
        masterReactor.start();
    }
}

小结

? ?今天我们对照上一篇番外篇文章实现了各种IO模型与线程模型,相信大家对IO方面有了更深的认识,这是为更容易上手Netty,Netty也是基于java nio,做了很多抽象与管理工作,因此也产出了很多组件,ok周末快乐~

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-09-21 01:01:36  更:2022-09-21 01:03:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/25 20:32:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码