前言
NioEndpoint在Tomcat中扮演的角色
负责接收请求的连接,并封装成SocketProcessor ,最后交给线程池去执行;NioEndpoint 组件是 I/O 多路复用模型的一种实现。
一、NioEndpoint总体概览
类结构
java.lang.Object
org.apache.tomcat.util.net.AbstractEndpoint<S,U>
org.apache.tomcat.util.net.AbstractJsseEndpoint<NioChannel,SocketChannel>
org.apache.tomcat.util.net.NioEndpoint
Tomcat 的 NioEndpoint 包含 LimitLatch 、Acceptor 、Poller 、SocketProcessor 和 Executor 共 5 个组;典型的主从网络I/O处理结构,
- 其中LimitLatch负责限制连接请求;
- Acceptor是主从结构中的“主”结构,仅负责接收并分发连接;
- Poller是主从结构中的“从”结构,负责监听Acceptor分发连接的I/O事件,本质也就是Selector,当监听有I/O事件就绪后,将对应的连接封装成SocketProcessor交给线程池处理
- SocketProcessor:具体连接的封装
- Executor:真正处理连接的线程池

二、NioEndpoint核心实现
1. LimitLatch组件
LimitLatch 用来控制连接个数,当连接数到达最大时阻塞线程,直到后续组件处理完一个连接后将连接数减 1。到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收,其中操作系统可接收的数量可通过acceptCount 参数限制;
LimitLatch核心源码:
public class LimitLatch {
private final LimitLatch.Sync sync;
private final AtomicLong count;
private volatile long limit;
private volatile boolean released = false;
public void countUpOrAwait() throws InterruptedException {
if (log.isDebugEnabled()) {
log.debug("Counting up[" + Thread.currentThread().getName() + "] latch=" + this.getCount());
}
this.sync.acquireSharedInterruptibly(1);
}
public long countDown() {
this.sync.releaseShared(0);
long result = this.getCount();
if (log.isDebugEnabled()) {
log.debug("Counting down[" + Thread.currentThread().getName() + "] latch=" + result);
}
return result;
}
...
private class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1L;
public Sync() {
}
protected int tryAcquireShared(int ignored) {
long newCount = LimitLatch.this.count.incrementAndGet();
if (!LimitLatch.this.released && newCount > LimitLatch.this.limit) {
LimitLatch.this.count.decrementAndGet();
return -1;
} else {
return 1;
}
}
protected boolean tryReleaseShared(int arg) {
LimitLatch.this.count.decrementAndGet();
return true;
}
}
}
LimitLatch 内步自定义扩展了 AQS 的Sync 内部类
2. Acceptor组件
Acceptor 实现了 Runnable 接口,因此可以跑在单独线程里。一个端口号只能对应一个 ServerSocketChannel ,因此这个 ServerSocketChannel 是在多个 Acceptor 线程之间共享的,它是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定,初始化过程:
protected void initServerSocket() throws Exception {
if (!this.getUseInheritedChannel()) {
this.serverSock = ServerSocketChannel.open();
this.socketProperties.setProperties(this.serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(this.getAddress(), this.getPortWithOffset());
this.serverSock.socket().bind(addr, this.getAcceptCount());
} else {
...
}
this.serverSock.configureBlocking(true);
}
初始化反映的两个重要信息
bind 方法的第二个参数表示操作系统的等待队列长度;当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过 acceptCount 参数配置,默认是 100。ServerSocketChannel 被设置成阻塞模式,它是以阻塞的方式接收连接的
Acceptor核心源码:
public class Acceptor<U> implements Runnable {
private final AbstractEndpoint<?, U> endpoint;
private String threadName;
protected volatile Acceptor.AcceptorState state;
...
public void run() {
while(this.endpoint.isRunning()) {
while(this.endpoint.isPaused() && this.endpoint.isRunning()) {
...
try {
this.endpoint.countUpOrAwaitConnection();
if (!this.endpoint.isPaused()) {
Object socket = null;
try {
socket = this.endpoint.serverSocketAccept();
} catch (Exception var6) {
this.endpoint.countDownConnection();
if (!this.endpoint.isRunning()) {
break;
}
this.handleExceptionWithDelay(errorDelay);
throw var6;
}
errorDelay = 0;
if (this.endpoint.isRunning() && !this.endpoint.isPaused()) {
if (!this.endpoint.setSocketOptions(socket)) {
this.endpoint.closeSocket(socket);
}
} else {
this.endpoint.destroySocket(socket);
}
}
} catch (Throwable var7) {
...
}
}
this.state = Acceptor.AcceptorState.ENDED;
}
...
}
ServerSocketChannel 通过 accept () 接受新的连接,accept() 方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollerEvent 对象中,并将 PollerEvent 对象压入 Poller 的 Queue 里
3. Poller组件
Poller 本质是一个 Selector ,它内部维护一个 Queue
private final SynchronizedQueue<NioEndpoint.PollerEvent> events = new SynchronizedQueue();
-
SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。同时有多个 Poller 线程在运行,每个 Poller 线程都有自己的 Queue。每个 Poller 线程可能同时被多个 Acceptor 线程调用来注册 PollerEvent。同样 Poller 的个数可以通过 pollers 参数配置。 -
Poller 不断的通过内部的 Selector 对象向内核查询 Channel 的状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel
Poller核心实现:
public class Poller implements Runnable {
private Selector selector = Selector.open();
private final SynchronizedQueue<NioEndpoint.PollerEvent> events = new SynchronizedQueue();
...
public void register(NioChannel socket, NioEndpoint.NioSocketWrapper socketWrapper) {
...
if (event == null) {
event = new NioEndpoint.PollerEvent(socket, 256);
} else {
event.reset(socket, 256);
}
this.addEvent(event);
}
public void run() {
while(true) {
boolean hasEvents = false;
label59: {
try {
if (!this.close) {
hasEvents = this.events();
if (this.wakeupCounter.getAndSet(-1L) > 0L) {
this.keyCount = this.selector.selectNow();
} else {
this.keyCount = this.selector.select(NioEndpoint.this.selectorTimeout);
}
this.wakeupCounter.set(0L);
}
...
} catch (Throwable var6) {
...
}
}
...
Iterator iterator = this.keyCount > 0 ? this.selector.selectedKeys().iterator() : null;
while(iterator != null && iterator.hasNext()) {
SelectionKey sk = (SelectionKey)iterator.next();
NioEndpoint.NioSocketWrapper socketWrapper = (NioEndpoint.NioSocketWrapper)sk.attachment();
if (socketWrapper == null) {
iterator.remove();
} else {
iterator.remove();
this.processKey(sk, socketWrapper);
}
}
...
}
}
}
4. SocketProcessor
Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务,主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,
Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel ,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper ,Http11Processor 只调用 SocketWrapper 的方法去读写数据。
5. Executor
Executor 是 Tomcat 定制版的线程池,它负责创建真正干活的工作线程,执行 SocketProcessor 的 run 方法,也就是解析请求并通过容器来处理请求,最终会调用到我们的 Servlet
|