? ? ? ?线程池大家都不陌生,但每次谈起又有陌生的感觉,归根结底是因为对它了解的不够深入,所以本文希望通过源码阅读的方式带给大家更深入的理解。首先我们先思考两个问题:
1. 为什么要有线程池?
(1)线程的创建和注销有会消耗系统资源,所以我们在线程池中让一定数量的线程保持活跃状态,避免不必要的系统开销,同时多线程并发执行,提高系统的响应速度。
(2)我们可以对线程的生命周期做统一化管理,可以对线程的使用做额外的扩展功能,对线程使用更好的管理起来。
2. 线程池的实现逻辑是什么?
(1)线程如何创建(ThreadFactory)?默认多少数量的线程(corePoolSize)保持活跃?超过默认活跃线程后,线程存储到哪里(BlockingQueue)?队列满了以后系统能支持开启线程最大数是多少(maximumPoolSize)?线程数量超过系统支持最大负载后怎么办(RejectedExecutionHandler 拒绝策略)?线程执行结束后如何结束线程(keepAliveTime unit)?
(2)线程如何在内存中存储?使用HashSet
根据以上讨论我们得出结论,线程池的作用是要管理线程的生命周期,并提供给调用方对应的执行结果。
? ? ? JDK的Excutor框架对线程池的实现做了规范化输出,所谓要创建优秀的框架先要了解优秀的框架,我们看下Excutor基本类图:
?Excutor接口:最上层约束子类要对线程Runnable执行excute接口。
ExcutorService接口:开始定义生命线程生命周期接口,包括:提交,执行,中断等。
AbstractExcutorService抽象类:约束了不同参数的submit执行流程,并对线程池执行返回结果invoke方法做了约束。
ThreadpoolExecutor类:具体实现线程池逻辑。
1. 如何定义线程池的生命周期?
答:通过ctl成员变量,使用32位int类型定义线程池状态(前三位)和线程数量(后29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//状态&线程数量标记位,因为线程池生命周期经常变化所以要保证原子性
private static final int COUNT_BITS = Integer.SIZE - 3; //线程数量位数:29位
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //线程数量最大容量,1左移29位-1是29个1,最大为2的30次方-1
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; //running状态:111+29个0,运行状态
private static final int SHUTDOWN = 0 << COUNT_BITS; //shutdown状态:32个0,表示不可以创建线程,阻塞队列仍然执行,并且会中断正在处理的任务
private static final int STOP = 1 << COUNT_BITS; //stop状态:001+29个0,不可以创建线程,阻塞队列停止
private static final int TIDYING = 2 << COUNT_BITS; //tidying状态:010+29个0,ctl的线程池数量清零,并执行钩子函数terminated()
private static final int TERMINATED = 3 << COUNT_BITS; // terminated状态:011+29个0,线程池彻底终止
状态流转图:
?2. execute&addwork方法解读:
excute执行流程:
?
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get(); //获取状态&线程数量记录标记
if (workerCountOf(c) < corePoolSize) { //如果小于核心线程数,则直接创建线程执行
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //线程池在运行状态,阻塞队列添加任务成功
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //再次检查不在运行状态,移除任务
reject(command); //拒绝任务
else if (workerCountOf(recheck) == 0) //如果工作线程数量为0
addWorker(null, false); //
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 获取运行状态
int rs = runStateOf(c);
/*
* 这个if判断
* 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
* 接着判断以下3个条件,只要有1个不满足,则返回false:
* 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
* 2. firsTask为空
* 3. 阻塞队列不为空
*
* 首先考虑rs == SHUTDOWN的情况
* 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
* 然后,如果firstTask为空,并且workQueue也为空,则返回false,
* 因为队列中已经没有任务了,不需要再添加线程了
*/
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
// 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
// 如果为false则根据maximumPoolSize来比较。
//
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试增加workerCount,如果成功,则跳出第一个for循环
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果增加workerCount失败,则重新获取ctl的值
c = ctl.get(); // Re-read ctl
// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据firstTask来创建Worker对象
w = new Worker(firstTask);
// 每一个Worker对象都会创建一个线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN表示是RUNNING状态;
// 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
// 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers是一个HashSet
workers.add(w);
int s = workers.size();
// largestPoolSize记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
? ? ? ? 具体线程执行类:worker,实现了Runnable接口同时继承AQS,提供线程能力,同时通过AQS控制线程状态。
final Thread thread; //具体执行线程
Runnable firstTask; //记录首次执行的任务
volatile long completedTasks; //完成的任务数量
Worker(Runnable firstTask) {
setState(-1); //避免被阻断直到runwork
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); //允许被终端,任务没有真正执行前可以中断
boolean completedAbruptly = true;
try {
//内部循环,task不为空,或者从队列中获取任务不为空
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 线程池处于中断或者 线程处于中断流程,word线程没有中断,则中断worker线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); //开放接口,在线程执行前用户可以自定义额外流程
Throwable thrown = null;
try {
task.run(); //启动任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); //开放接口,线程执行结束前自定义额外流程
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) //如果是突然完成,则减少现在工作的线程数量
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks; //完成数量+1
workers.remove(w); //从workers中移出
} finally {
mainLock.unlock();
}
tryTerminate(); //尝试修改线程池状态:terminated
int c = ctl.get(); //获取线程池标记
if (runStateLessThan(c, STOP)) { //如果线程池状态Running
if (!completedAbruptly) { //不是异常退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //判断核心线程是否处于空闲
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min) //工作的线程数量大于最少的工作线程数量
return; // 替换掉不需要的
}
//是异常退出则添加一个没有任务的woker,直接从队列中获取任务执行
addWorker(null, false);
}
}
? ??
|