IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Tomcat之NioEndpoint组件:连接处理第一站 -> 正文阅读

[Java知识库]Tomcat之NioEndpoint组件:连接处理第一站


前言

NioEndpoint在Tomcat中扮演的角色

负责接收请求的连接,并封装成SocketProcessor,最后交给线程池去执行;NioEndpoint 组件是 I/O 多路复用模型的一种实现。


一、NioEndpoint总体概览

类结构

java.lang.Object
      org.apache.tomcat.util.net.AbstractEndpoint<S,U>
          org.apache.tomcat.util.net.AbstractJsseEndpoint<NioChannel,SocketChannel>
               org.apache.tomcat.util.net.NioEndpoint

Tomcat 的 NioEndpoint 包含 LimitLatchAcceptorPollerSocketProcessorExecutor 共 5 个组;典型的主从网络I/O处理结构,

  • 其中LimitLatch负责限制连接请求;
  • Acceptor是主从结构中的“主”结构,仅负责接收并分发连接;
  • Poller是主从结构中的“从”结构,负责监听Acceptor分发连接的I/O事件,本质也就是Selector,当监听有I/O事件就绪后,将对应的连接封装成SocketProcessor交给线程池处理
  • SocketProcessor:具体连接的封装
  • Executor:真正处理连接的线程池
    在这里插入图片描述

二、NioEndpoint核心实现

1. LimitLatch组件

LimitLatch 用来控制连接个数,当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减 1。到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收,其中操作系统可接收的数量可通过acceptCount参数限制;

LimitLatch核心源码:

public class LimitLatch {
    private final LimitLatch.Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    private volatile boolean released = false;
    // 新增一个连接累加1,线程可能会阻塞
    public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + this.getCount());
        }

        this.sync.acquireSharedInterruptibly(1);
    }
    // 释放一个连接 -1,前面阻塞的线程可能被唤醒
    public long countDown() {
        this.sync.releaseShared(0);
        long result = this.getCount();
        if (log.isDebugEnabled()) {
            log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
        }

        return result;
    }
     
    ...
     
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;

        public Sync() {
        }
        // 如果当前连接数 count 小于 limit,线程能获取锁,返回 1,否则返回 -1
        protected int tryAcquireShared(int ignored) {
            long newCount = LimitLatch.this.count.incrementAndGet();
            if (!LimitLatch.this.released && newCount > LimitLatch.this.limit) {
                LimitLatch.this.count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }

        protected boolean tryReleaseShared(int arg) {
            LimitLatch.this.count.decrementAndGet();
            return true;
        }
    }
}

LimitLatch 内步自定义扩展了 AQSSync内部类

2. Acceptor组件

Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。一个端口号只能对应一个 ServerSocketChannel,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定,初始化过程:

    protected void initServerSocket() throws Exception {
        if (!this.getUseInheritedChannel()) {
            this.serverSock = ServerSocketChannel.open();
            this.socketProperties.setProperties(this.serverSock.socket());
            InetSocketAddress addr = new InetSocketAddress(this.getAddress(), this.getPortWithOffset());
            this.serverSock.socket().bind(addr, this.getAcceptCount());
        } else {
           ...
        }
        this.serverSock.configureBlocking(true);
    }

初始化反映的两个重要信息

  • bind 方法的第二个参数表示操作系统的等待队列长度;当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过 acceptCount 参数配置,默认是 100。
  • ServerSocketChannel 被设置成阻塞模式,它是以阻塞的方式接收连接的

Acceptor核心源码:

public class Acceptor<U> implements Runnable {

    private final AbstractEndpoint<?, U> endpoint;
    private String threadName;
    protected volatile Acceptor.AcceptorState state;

    ...
    
    public void run() {
        while(this.endpoint.isRunning()) {
            while(this.endpoint.isPaused() && this.endpoint.isRunning()) {
 
            ...

            try {
                // 由limitLatch限制连接,可能阻塞
                this.endpoint.countUpOrAwaitConnection();
                if (!this.endpoint.isPaused()) {
                    Object socket = null;

                    try {
                        // 阻塞的接受socket连接(初始化的时候设置成阻塞的)
                        socket = this.endpoint.serverSocketAccept();
                    } catch (Exception var6) {
                        this.endpoint.countDownConnection();
                        if (!this.endpoint.isRunning()) {
                            break;
                        }

                        this.handleExceptionWithDelay(errorDelay);
                        throw var6;
                    }

                    errorDelay = 0;
                    if (this.endpoint.isRunning() && !this.endpoint.isPaused()) {
                        // 实质上就是将新连接放入poller队列
                        if (!this.endpoint.setSocketOptions(socket)) {
                            this.endpoint.closeSocket(socket);
                        }
                    } else {
                        this.endpoint.destroySocket(socket);
                    }
                }
            } catch (Throwable var7) {
            
              ...
              
            }
        }

        this.state = Acceptor.AcceptorState.ENDED;
    }
    ...
}

ServerSocketChannel 通过 accept() 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue

3. Poller组件

Poller 本质是一个 Selector,它内部维护一个 Queue

 private final SynchronizedQueue<NioEndpoint.PollerEvent> events = new SynchronizedQueue();
  • SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。同时有多个 Poller 线程在运行,每个 Poller 线程都有自己的 Queue。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。

  • Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel

Poller核心实现:

    public class Poller implements Runnable {
        private Selector selector = Selector.open();
        private final SynchronizedQueue<NioEndpoint.PollerEvent> events = new SynchronizedQueue();

        ...
        // 将socket包装成PollerEvent事件放入队列
        public void register(NioChannel socket, NioEndpoint.NioSocketWrapper socketWrapper) {
            
            ...
            
            if (event == null) {
                event = new NioEndpoint.PollerEvent(socket, 256);
            } else {
                event.reset(socket, 256);
            }

            this.addEvent(event);
        }

        // poller核心实现,也就是这里扮演着selector的角色,IO多路复用的select()实现
        public void run() {
            while(true) {
                boolean hasEvents = false;
                // 第一步,检测是否有准备就绪的IO事件
                label59: {
                    try {
                        if (!this.close) {
                            hasEvents = this.events();
                            if (this.wakeupCounter.getAndSet(-1L) > 0L) {
                                this.keyCount = this.selector.selectNow();
                            } else {
                                this.keyCount = this.selector.select(NioEndpoint.this.selectorTimeout);
                            }

                            this.wakeupCounter.set(0L);
                        }

                        ...
                        
                    } catch (Throwable var6) {
                        ...
                    }
                }
                ...
            
                // 第二步处理已经就绪的IO事件
                Iterator iterator = this.keyCount > 0 ? this.selector.selectedKeys().iterator() : null;
                while(iterator != null && iterator.hasNext()) {
                    SelectionKey sk = (SelectionKey)iterator.next();
                    NioEndpoint.NioSocketWrapper socketWrapper = (NioEndpoint.NioSocketWrapper)sk.attachment();
                    if (socketWrapper == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        // 封装成NioSocketWrapper,最终交给线程池处理
                        this.processKey(sk, socketWrapper);
                    }
                }
                ...
            }
        }

    }

4. SocketProcessor

Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,

Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannelSocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。

5. Executor

Executor 是 Tomcat 定制版的线程池,它负责创建真正干活的工作线程,执行 SocketProcessorrun 方法,也就是解析请求并通过容器来处理请求,最终会调用到我们的 Servlet

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-08-20 14:57:12  更:2021-08-20 14:59:38 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 3:43:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码