基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码!
上一篇文章中,我们介绍了:Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性。这一篇文章,我们将会介绍ThreadPoolExecutor线程池的execute方法源码,该方法是线程池的核心方法,非常重要。
系列文章:
- Java Executor源码解析(1)—Executor执行框架的概述
- Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性【一万字】
- Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
- Java Executor源码解析(4)—ThreadPoolExecutor线程池submit方法以及FutureTask源码【一万字】
- Java Executor源码解析(5)—ThreadPoolExecutor线程池其他方法的源码
- Java Executor源码解析(6)—ScheduledThreadPoolExecutor调度线程池源码解析【一万字】
1 execute核心提交方法
public void execute(Runnable command)
传递一个Runnable任务对象,然后由线程池对它进行异步执行。没有办法检查Runnable是否执行完毕。如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出RejectedExecutionException;如果任务为null,那么抛出NullPointerException。
execute方法是线程池的绝对核心方法,很多其他内部方法都是为该方法服务的,涉及到的流程和代码非常复杂。
execute方法的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 首先就是command任务的null校验,如果为null直接抛出NullPointerException;
- 调用workerCountOf计算运行线程数量,如果小于corePoolSize,即目前在行线程数小于核心线程数:
- 调用addWorker方法尝试启动新线程去执行command任务,使用corePoolSize作为线程数量上限。如果任务提交给新线程成功,那么直接返回true。
- 到这一步,肯定是addWorker方法返回false。表示可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。
- 到这一步,可能线程池被关闭了,或者线程数量达到了corePoolSize,或者任务队列满了且工作线程数量达到了最大值maximumPoolSize等等情况。判断线程池是否是RUNNING状态,以及尝试加入任务队列。如果线程池还是RUNNING状态并且成功加入任务队列:
- 再次检查线程池是否不是RUNNING状态,以及尝试将任务移除任务队列。若果线程池不是RUNNING状态,并且将任务移除任务队列成功,那么对任务执行拒绝策略。
- 否则,表示线程池还是RUNNING状态,或者线程池不是RUNNING状态,但是任务移除队列失败。判断当前工作线程数是否为0,如果是,尝试启动新线程,从队列中获取任务,使用maximumPoolSize作为线程数量上限,这一步为了维持线程池的活性。 因为有可能在新任务offer加入队列的时候,其他工作线程都因为超时或者SHOTDOWN而被清理,此时仍然需要新开线程对加入进来的任务进行完成。
- 否则,表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是加入任务队列失败,即任务队列满了。尝试启动新线程去执行command任务,使用maximumPoolSize作为线程数量上限。如果失败,则表示线程池不是RUNNING状态,或者线程池是RUNNING状态但是任务队列满了且工作线程数量达到了最大值maximumPoolSize,那么执行拒绝策略。
上面的判断都没有加锁,因此状态可能都是瞬时性的判断,不保证一直有效。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
2 addWorker尝试添加新线程
addWorker尝试新建一个工作线程(Worker)并启动去执行任务,同样作为线程池的核心方法。大概步骤为:检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,首先CAS的增加ctl的线程计数。随后尝试新增一个Worker,通过线程工厂创建一个线程,将参数中的任务作为第一个任务执行并返回true。如果线程工厂无法创建线程或者返回null,那么将会回滚此前的操作,比如减少线程计数,移除创建的Worker等,最后返回false。
addWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 开启一个死循环,相当于自旋,使用retry标记。检查当前线程池状态是否为运行态RUNNING以及线程数小于给定的最大线程数边界值,如果不满足,那么返回false,如果满足,那么表示可以新增Worker,那么首先CAS的增加ctl的线程计数,成功后退出循环。
- 获取此时最新的ctl值c,通过c获取此时线程池的状态rs。
- 线程池状态校验:如果线程状态的值大于等于SHUTDOWN,那么表示线程池不是运行状态,并且(rs等于SHUTDOWN 并且 firstTask为null 并且 workQueue不为空)这三个条件有一个成立,那么直接返回false,adderWorker失败。即只有RUNNING状态,或者SHUTDOWN状态并且不是新添加任务的请求并且任务队列不为空,满足这两种情况的一种,才进行下一步;
- 到这一步,说明通过判断线程池状态校验通过。内部再开启一个死循环:首先校验线程数量,如果不符合规则,那么直接返回false。随后尝试预先CAS的将WorkerCount线程数量自增1,成功之后退出整个双层大循环,继续下一步。失败之后会重新获取ctl的值,然后判断线程状态是否改变,如果状态改变了那么进入下一次外层循环,再次进行状态校验如果状态没有改变,那么那么进入下一次内层循环,不断地循环重试CAS操作即可。
- 获取此时最新的线程数量wc。
- 线程数量校验:如果wc大于等于CAPACITY(最大线程数),或者wc大于等于匹配最大值边界(如果core参数为true就是corePoolSize,否则就是maximumPoolSize)。满足两种条件的一个,那么直接返回false,表示线程数量不符合要求。
- 到这一步,表示满足线程数量要求。那么尝试预先CAS的将WorkerCount线程数量自增1,即ctl值自增1,CAS操作成功之后,直接break retry跳出外层循环,这一步就算完成了,进入下一步尝试新增Worker。
- 到这一步返回跳出循环,表示CAS失败,重新获取最新的ctl值c。
- 继续判断如果线程池运行状态不等于rs,线程池的状态改变了,我们知道线程池的状态是不可逆的,那么continue retry结束内层循环,结束本次外层循环,继续下一次外层循环,将可能在下一次的外层循环中因为线程状态的原因而返回false 并退出。
- 到这一步还没跳出循环,表示CAS失败,并且线程池状态也没有改变,那么重新开始下一次内层循环重试CAS即可。
- 到这一步,表示线程池状态和线程数量校验通过,并且已经预先新增了WorkerCount线程数量的值,下面是尝试新增Worker并启动线程的逻辑。
- workerStarted表示新增的工作线程是否已启动的标志位,初始化为false,表示未启动。workerAdded表示新增的工作线程是否已加入workers集合,初始化为false,表示未加入。w表示需要新增的Worker对象,初始化为null。
- 开启一个try代码块:
- 新建一个Worker赋给w,传入firstTask参数,作为将要执行的第一个任务,在构造器中还会通过线程工厂初始化一个新线程。
- 获取w内部的新线程t。如果t不为null:
- 下面的步骤涉及到workers集合的改动以及新线程的执行,甚至其他参数比如largestPoolSize的改动,需要获取mainLock锁,成功之后进行下一步并开启一个try代码块:
- 获取此时的线程池运行状态值rs。
- 首先进行的是再次的检查线程池状态,因为可能在最上面的循环之后-获取锁之前,线程池的状态发生改变,比如被停止。,如果rs小于SHUTDOWN,即属于RUNNING状态,或者rs属于SHUTDOWN状态,并且firstTask为null(不是新增任务的请求)满足这两个条件中的一个,才可以真正的开启线程:
- 继续校验线程t是否是活动状态,因为如果线程已经处于活动状态,表示已经执行了start()方法,即已经开始执行了run方法。那么这个线程就不能再执行新任务,不符合要求,由调用方直接抛出IllegalThreadStateException异常。
- 上面的校验通过,那么将新建的Worker加入workers的set集合,这是blockingQueue自己的方法。
- 获取workers的数量s,就表示目前工作线程数量,如果s大于largestPoolSize,即大于历史最大线程数量,那么largestPoolSize更新为s。
- workerAdded置为true,表示新增的工作线程已加入workers集合。
- 最终,无论上面的代码是成功了还是发生了异常,都需要在finally中解锁mainLock。
- 解锁成功之后的代码。如果workerAdded为true,即新增的工作线程已加入workers集合,说明操作完全没问题,一切正常:
- 那么启动线程t,线程t将会执行Worker中的run方法。workerStarted置为true,表示新增的工作线程已启动。
- 最终,无论上面的try代码是成功了还是发生了异常,都走finally语句:
- 如果workerStarted为false,即新增的工作线程最终没能启动成功,那么调用addWorkerFailed对此前可能做出的改变进行“回滚”。否则finally中什么也不做。
- 返回新增的工作线程已启动的标志workerStarted,如果已启动线程,表示新增工作线程成功,否则表示新增工作线程失败。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2.1 addWorkerFailed添加Worker失败处理
addWorkerFailed方法仅仅在addWorker方法中被调用,表示添加Worker失败时的回滚操作,传递的参数就是代表新增Worker的w变量。
addWorkerFailed的详细步骤:
- 获取mainLock锁;
- 如果w不为null,即Worker被添加到了workers中,那么从workers中移除w;
- 调用decrementWorkerCount,循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 此时线程池可能不是RUNNING状态,那么调用tryTerminate方法尝试将线程池状态设置为TERMINATED彻底结束线程池。
- 最终解锁。
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
2.1.1 tryTerminate尝试终止线程池
tryTerminate方法在ThreadPoolExecutor的shutdown、shutdownNow、addWorkerFailed、remove、purge这几个方法中被调用,这几个方法被调用时,一般都是涉及到线程池状态改变、移除workers集合中的线程、移除任务队列中的任务的情况,该方法用于尝试终止线程池(将线程池状态将尝试转换为TERMINATED)。
tryTerminate的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 开启一个死循环,尝试终止线程池:
- 获取此时最新的ctl值c;
- 线程池状态校验:如果处于RUNNING状态,不能终止线程池;或者如果运行状态值大于等于TIDYING,不需要该请求去终止线程池;或者如果运行状态值等于SHUTDOWN,并且workQueue任务队列不为空,不能终止线程池。以上三个条件满足一种,即可立即返回。
- 到这里,表示:线程池位于STOP状态,或者线程池位于SHUTDOWN状态且workQueue任务队列为空,上面两种状态满足一种,表示都可以尝试关闭线程池,但是也不一定,需要继续判断。
- 如果workerCount线程计数不为0,那么不能终止线程池。此这里可能是SHUTDOWN状态,因为调用shutdown方法仅仅在interruptIdleWorkers方法中中断空闲的线程,可能存在正在运行的线程没有被中断;当然也有可能是STOP状态,即如果还有线程还没有启动(state为-1),那么该线程也不会在shutdownNow的interruptWorkers方法中被中断。
- 执行tryTerminate方法时,运行状态的线程(获取w锁)可能处于空闲状态了(释放w锁),此时主要是需要调用interruptIdleWorkers (ONLY_ONE)尝试对于空闲状态的线程传播SHUTDOWN、STOP状态,让它们都被中断并移除。随后返回。
- 到这里表示工作线程数为0,那么表示可以尝试更改线程池状态,进一步停止线程池。首先获取mainLock锁,因为下面需要唤醒在termination上等待的外部线程。
- 开启一个try代码块:
- 尝试将ctl的值CAS的从c更改为TIDYING状态的值,即转换为TERMINATED状态。如果CAS成功:
- 在另一个try块中执行terminated()钩子方法,该方法是空实现,可以由子类重写。
- 无论terminated是否抛出异常,最终都执行finally块:
- 将ctl的值CAS的从TIDYING更改为TERMINATED状态的值,即转换为TERMINATED状态,表示线程池被彻底停止。
- 唤醒所有调用awaitTermination并且还是处于等待状态的外部线程,通知线程池已经彻底关闭。
- 方法返回。
- 最终需要在finally块中解锁。
- 到这一步还没有返回,表示CAS操作更改TIDYING失败了,可能是其他线程操作成功了,因此继续下一次循环。如果其他线程操作成功,那么本次的tryTerminate操作将会在下一次循环中,因为判断线程池状态不满足而退出。这里我们能够看出来,线程池状态转换为TIDYING以及TERMINATED的过程是连续的,只会在一个线程的同一次tryTerminate方法调用中完成。
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
2.1.1.1 interruptIdleWorkers中断空闲线程
尝试中断空闲的线程,即没有获取Worker锁的线程,比如在等待任务的线程,以便它们在后续运行中可以检测到线程池停止状态(比如shutdown()或者shutdownNow()),或者配置被更改的信息(比如setMaximumPoolSize())。
shutdown()方法或者设置线程池信息的方法比如setCorePoolSize()、setMaximumPoolSize()或者tryTerminate等方法中会调用。
代码很简单,主要是理解onlyOne参数的意思,如果为true,那么最多中断一个Woker;如果为false,那么尝试中断所有空闲线程。代码中循环workers集合,如果当前Worker没有被中断,并且尝试获取Worker锁成功(此前没有获取Worker锁),那么就是中断该线程,随后根据onlyOne参数确定是否跳出集合。
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
3 Worker线程包装类
线程池中的一个工作线程就被包装为一个Worker的对象,实现了Runnable 接口,可以看作线程任务。
Worker还继承了AbstractQueuedSynchronizer(AQS),自己实现了简单的不可重入的独占锁,state=0 表示锁未被获取状态, state=l 表示锁己经被获取的状态。这里的实现的独占“锁”,实际上就是用来控制该线程是否可以被中断的,获取锁的线程可以看作是正在工作中的线程,没有获取到锁的线程可以看作空闲线程。
不可重入是为了保护正在执行任务的线程(已经获取到了锁)对应的的w锁不会被被其他比如setCorePoolSize、setMaximumPoolSize、shutdown等线程池控制方法再次获取并中断。
另外刚创建Worker时state状态则被设置为-l,这是为了避免该新增的Worker的线程在运行runWorker()方法前就被中断(包括shutdown和shutdownNow),即还没处理过任务是不允许被中断的。
它的构造器中会调用newThread方法,该方法需要传递一个线程任务给线程工厂,然后线程工厂将返回一个线程,该线程启动后会执行这个线程任务。可以看到这里传递的就是this,这个this代指该Worker对象本身,因为Worker实现了Runnable,因此实际上返回的thread在start启动之后,会执行对应Worker的run方法。
关于run方法以及与锁相关的方法介绍请看代码注释:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
public void unlock() {
release(1);
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public boolean isLocked() {
return isHeldExclusively();
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
3.1 runWorker执行工作
runWorker方法是Worker的run方法中调用的方法,实际上就是由工作线程执行任务的核心逻辑,到这里,这个方法的执行者将从execute的调用线程变成工作线程。
大概逻辑如下:
- 工作线程将会从此前的firstTask任务开始执行,如果没有,那么从任务队列通过getTask拉取任务来执行。
- 如果firstTask和getTask都返回null,那么表示该任务线程将会退出,对应的Worker将被清理。
- 在执行每一个任务之前,会获取对应的Worker锁,获取了w锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态。此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize、shutdown等其他控制操作获取,即该工作线程此时不能被中断,但是shutdownNow方法除外。
- 每次都会调用任务前置方法beforeExecute,如果重写了该方法并且抛出了异常,那么任务不会再被执行,并且该线程及其Worker将被清理;每次任务被执行后,都会执行afterExecute后置方法,如果重写了该方法并且抛出了异常,那么该线程及其Worker将被清理。
runWorker的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 获取当前工作线程wt,获取第一个要执行任务w.firstTask赋给的task变量,随后w.firstTask置null,释放引用;
- 调用unlock“解锁”。这的解锁操作仅仅是将state设置为0,到此该线程将支持被interrupt中断;
- completedAbruptly表示线程执行过程中是否发生异常的标志位,初始化为true;
- 开启一个try代码块:
- 开启一个循环,下面就是线程不断循环处理任务的逻辑。循环条件是:task不为null,或者调用getTask方法,将的返回值赋给task,此task不为null,那么可以继续循环处理获取的任务;否则退出循环,该线程以及Worker将会被清理:
- 获取对应Worker锁,就是将state从0变成1,获取了锁就表示该线程处于工作状态,其他时候比如getTask就表示该线程处于空闲状态。此时该Worker锁不可以被比如setCorePoolSize、setMaximumPoolSize、shutdown等其他控制操作获取,即该工作线程此时不能被中断,但是shutdownNow方法除外。
- 进行一个判断的表达式操作。如果线程池状态值大于等于STOP,此时工作线程应该被无条件被中断,表达式结果为true;线程池状态值小于STOP,但是线程被中断了,那么恢复被中断的线程,将中断标志位改为false,表达式结果为false。
- 如果表达式为true,将当前线程中断,仅仅是设置中断标志位而已。
- 在一个try块中执行任务。首先执行前置方法beforeExecute,默认空实现,自定义的子类可以实现自己的逻辑。如果抛出了异常,那么该任务将不会被执行,,该线程以及Worker将会被清理。
- 使用thrown变量记录任务执行过程中抛出的异常。
- 在一个try块中, 调用task.run,到这里才是真正的执行任务。途中捕获各种异常赋给thrown。
- 无论task.run有没有抛出异常,都会在finally中执行后置方法afterExecute,默认空实现,自定义的子类可以实现自己的逻辑。如果抛出了异常,那么该任务将不会被执行,,该线程以及Worker将会被清理。
- 无论上面的代码有没有抛出异常,都会执行finally语句块:
- 将task变量置null,表示该任务执行完毕,无论是真的成功了还是失败了。
- 此Worker记录的完成任务数量completedTasks自增1
- 如果上面没有抛出异常,那么到此一个任务执行完毕,继续下一次循环,从任务队列中拉取任务执行!
- 到这一步跳出了循环,表示task为null,并且通过getTask重任务队列获取的任务也为null,即没有任务可执行了,循环结束,这是正常行为,将completedAbruptly置为false。
- 执行finally语句块的情况:可能是执行过程中抛出了异常,或者getTask返回null,而getTask返回null则可能是各种各样的情况,比如超时、线程池被关闭等。此时该线程和Worker将被清理!
- 调用processWorkerExit方法,传递Worker对象以及是否发生异常的标志位。processWorkerExit方法会将该Worker移除workers集合,并且根据completedAbruptly决定是否新建Worker添加到workers集合。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
3.1.1 getTask拉取任务
getTask方法的主要作用是从阻塞队列中拉取任务,是由工作线程调用的。将会在一个循环中拉取并返回一个任务,由于是阻塞队列,在没有任务时该方法会可能会超时阻塞或者一直阻塞。以下情况将会直接返回null:
- 具有超过maximumPoolSize的线程数量,可能在运行时动态的设置了maximumPoolSize,将maximumPoolSize调小了,此时需要丢弃部分工作线程;
- 线程池处于STOP及其之后的状态,无论还有没有任务,无条件清除全部工作线程;
- 线程池处于SHUTDOWN状态,且 workQueue 为空,队列中的任务已执行完毕,清除工作线程;
- 如果线程数大于corePoolSize,则对超过的线程在keepAliveTime超时之后还没获取到任务就会返回null,如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么对全部线程应用超时时间,这里返回null用于清除多余的工作线程,控制线程数量。
getTask的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- timedOut变量代表上一次的poll()操作是否超时的标志,初始化为fale,表示未超时;
- 开启一个死循环,相当于自旋,从任务队列中拉取任务:
- 获取此时的ctl值c,获取此时的运行状态rs;
- 检查线程池状态: 如果线程池状态值大于等于SHUTDOWN,并且(线程池状态值大于等于STOP,或者任务队列为空)。即如果线程池处于STOP及其之后的状态,或者线程池处于SHUTDOWN状态,且 workQueue 为空。那么表示线程池被关闭,且线程不可以继续执行下去。
- 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 返回null,getTask方法结束,随后该Worker会被清理。
- 到这一步,表示线程池状态符合要求,但是状态是可能随时变化的。获取线程数量wc。
- timed变量表示对工作线程是否应用超时等待的标志。如果allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),表示所有线程都应用超时等待,或者wc大于核心线程数量,那么可以对超过corePoolSize的线程应用超时等待。以上情况满足一种,timed即为true,否则为false。
- 校验线程数量以及是否超时: (如果wc大于最大线程数,表示在线程池运行时动态的将maximumPoolSize调小了,或者上一次的poll()操作已经超时,并且可以对工作线程应用超时等待),并且(如果wc大于1,或者任务队列为空)。
- 这两种情况都满足,表示就可能会返回null。尝试CAS的将ctl的WorkerCount线程数量部分自减1;成功之后返回null,随后该Worker会被清理,getTask方法结束。
- CAS失败之后,继续下一次循环,即这种关闭不是需要立即完成的。因为没有使用锁,可能有多个线程同时超时,此时需要控制因为超时返回的线程数量满足要求,比如某一批CAS成功的超时线程返回之后,其他CAS失败的超时线程在下一次循环时就可能因为数量不达标不会应用超时了。这里也是为什么本次的超时操作要等待到下一循环次才可能返回的原因,因为每次循环都会获取最新的wc和timed值。
- 到这一步,表示线程池的状态、线程数量、以及超时时间满足要求,开始真正的拉取任务。开启一个try代码块:
- 判断timed是否为true,如果为true,那么需要对线程应用超时等待,调用workQueue的超时poll方法,在超时时间范围内等待获取并移除任务(队头),如果超时时间没有获取道任务,那么返回null;如果为false,那么不需要对线程应用超时等待,调用workQueue的take方法,获取并移除任务(队头),没有获取到任务将会一直等待。返回值使用r变量接收。
- 到这一步,表示拉取的方法返回了,如果返回值r不为null,表示拉取到了任务,那么返回该任务,getTask方法结束。
- 到这一步还没有返回,表示没有拉取到任务,属于等待超时,那么timedOut设置为true。继续下一次循环,下一次循环中将可能会返回null,也可能不会。
- 使用catch捕获try块中抛出的InterruptedException。如果捕获到了,那么timedOut设置为false,此时不算拉取超时,继续下一次循环。在shutdown或者其他调整核心参数的方法关闭空闲线程的时候,就是设置中断标志,会将等待的线程唤醒,下一次循环中该空闲线程可能会由于线程状态的原因而返回。如果发生了其他异常,那么由于不能捕获异常而,直接退出该方法,getTask方法结束,随后该Worker会被清理。
private Runnable getTask() {
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3.1.2 processWorkerExit清理/补充Worker
当在runWorker的任务执行过程中抛出异常,或者getTask返回null,getTask返回null,则可能是各种各样的情况,比如超时、比如线程池被关闭等,那么这个Worker被丢弃,此时需要调用processWorkerExit方法对于无用的Worker进行清理,该方法仅仅会被Worker对象的线程调用,即也是由工作线程调用的。
processWorkerExit方法需要传递Worker对象以及是否发生异常的标志位completedAbruptly。该方法会将参数Worker移除workers集合,并且根据线程池状态以及completedAbruptly决定是否补充新Worker。
在移除Worker之后,如果此时线程池状态为RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止,或者不是应为抛出异常而被停止,但此时的线程数量小于最小线程数min。这两情况都会补充一个新Worker。 这就是我们常说的:线程池中的线程抛出异常之后,会自动补充开启一个线程的原理,现在我们也能明白不是无条件开启的。
processWorkerExit的详细步骤如下,看的时候建议结合更详细的代码注释一起看:
- 如果completedAbruptly为true,表示因为异常才调用该方法,此时workerCount还没有来得及自减1,这里需要补上:
- 循环尝试CAS的将ctl的WorkerCount线程数量部分自减1,直到成功为止。
- 获取mainLock锁;
- 更新线程池已完成的任务数量,当前的值加上终止线程记录的任务执行数量,只有在某个Worker工作线程终止时才会更新。
- 从workers集合中移除该Worker。
- 最终解锁。
- 此时线程池可能不是处于RUNNING状态,调用tryTerminate尝试彻底终止线程池。
- 获取此时的ctl值c。如果线程池状态值小于等于STOP,即处于RUNNING或者SHUTDOWN状态,那么可能需要补充线程。
- 继续判断如果completedAbruptly为false,表示不是因为抛出异常被停止。
- 使用min变量表示线程池需要保持的最小线程数。如果设置了allowCoreThreadTimeOut为true(设置成功的要求是超时时间大于0),那么表示对所有线程应用超时,那么min=0;如果allowCoreThreadTimeOut为fasle,那么表示对超过核心线程数量的线程线程应用超时,那么min=corePoolSize。
- 如果min为0,并且任务队列不等于空,那么最小线程数应该为1,因为此时需要至少一个线程去执行任务队列中的任务。min设置为1。
- 如果线程数量大于等于最小线程数min,那么不需要补充线程,直接return结束方法;否则表示不需要补充线程,进入下一步。
- 到这一步,表示:处于RUNNING或者SHUTDOWN状态,并且是因为抛出异常而被停止,或者处于RUNNING或者SHUTDOWN状态,并且此时的线程数量小于最小线程数min。这两种情况满足一种,即需要补充一个Worker,那么调用addWorker方法尝试新增一个Worker。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
|