项目中经常会用到线程池来解决一些异步任务,本篇则主要从源码角度来分析一下其执行的过程。
使用过程如下:
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
threadSize+1,
threadSize+1,
10,
TimeUnit.SECONDS
,new SynchronousQueue<>());
一、背景与介绍
在Java中异步任务的处理,我们通常会使用Executor框架,而ThreadPoolExecutor是JUC为我们提供的线程池实现。
1)何为线程池?
“线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控
2)什么时候使用?
3)优点
- 降低创建线程和销毁线程的性能开销
- 提高响应速度,当有新任务需要执行是不需要等待线程创建就可以立马执行
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
二、executor
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。
1)继承与实现UML图
2)源码
我们可以看到,就一个execute方法
public interface Executor {
void execute(Runnable command);
}
三、ExecutorService
由UML图我们可以看到ExecutorService起到了至关重要的作用。它定义了线程池的具体行为。
1)execute:履行Ruannable类型的任务
void execute(Runnable command);
2)submit:可用来提交Callable或Runnable任务,并返回代表此任务的Future 对象
Future submit(Callable task); Future<?> submit(Runnable task);
3)shutdown:在完成已提交的任务后封闭办事,不再接管新任务
void shutdown();
4)shutdownNow:停止所有正在履行的任务并封闭办事
List shutdownNow();
5)isTerminated:测试是否所有任务都履行完毕了
boolean isTerminated();
6)isShutdown:测试是否该ExecutorService已被关闭
boolean isShutdown();
四、线程池的具体实现
- ThreadPoolExecutor 默认线程池
- ScheduledThreadPoolExecutor 定时线程池
以默认线程池讲解源代码
1)任务提交两种方式
- 1、public void execute() //提交任务无返回值
- 2、public Future<?> submit() //任务执行完成后有返回值
2)核心参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
① corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行; 如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
② maximumPoolSize
注:最大线程数=核心线程数+非核心线程数
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;
③ keepAliveTime
线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime;
④ unit
keepAliveTime的单位;
⑤ workQueue
用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
- 1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
- 2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
- 3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
- 4、priorityBlockingQuene:具有优先级的无界阻塞队列;
⑥ threadFactory
它是ThreadFactory类型的变量,用来创建新线程。默认使用 Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。
⑦ handler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
- 1、AbortPolicy:直接抛出异常,默认策略;
- 2、CallerRunsPolicy:用调用者所在的线程来执行任务;
- 3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- 4、DiscardPolicy:直接丢弃任务;
上面的4种策略都是ThreadPoolExecutor的内部类。 当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
五、线程池的生命状态
1)5种生命状态
-
running 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。 线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0! -
shutdown RUNNING状态下调用shutdown方法后进入此状态。此状态下线程池不接受新任务,但会处理队列中等待的任务。 -
stop RUNNING/SHUTDOWN状态下调用shutdownNow方法后进入此状态。此状态下线程池不接受新任务,也不处理既有等待任务,并且会中断既有运行中的线程。 -
tidying SHUTDOWN/STOP状态会流转到此状态。此时所有任务都已运行完毕,工作线程数为0,任务队列都为空。从字面角度理解,此时线程池已经清干净了。 -
terminated 线程池彻底终止,就变成TERMINATED状态。
2)何为ctl? COUNT_BITS?CAPACITY?
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
① 何为ctl?
ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它的类型为AtomicInteger。 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。
理解:高3位保存runState,低29位保存workerCount
首先明白int占位情况:
(signed) int (有符号整形, 32位机器占四个字节32bit位) 最小值 1000 0000 0000 0000 0000 0000 0000 0000 [2^31](- 2147483648) 每个类型的这种形式都是有符号数的最小值 最大值 0111 1111 1111 1111 1111 1111 1111 1111 [2^31-1] (2147483647) 此时最大值要是再加一,结果就 是-2147483648
unsigned int (无符号整形, 32机器占四个字节32bit位) 最小值 0000 0000 0000 0000 0000 0000 0000 0000(0) 最大值 1111 1111 1111 1111 1111 1111 1111 1111 [2^32-1] (4294967296) 此时最大值要是再加一,结果就是0
② COUNT_BITS
COUNT_BITS Integer.SIZE-3 也就是 29
③ CAPACITY
CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。 直接可以走一下程序计算,我算出来的1<<29的value是536870912
3)ctl相关方法
//获取运行状态;
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取活动线程数;
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取运行状态和活动线程数的值。
private static int ctlOf(int rs, int wc) { return rs | wc; }
4)runState is stored in the high-order bits
位的运算:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
该怎么理解如上的代码呢?我们可以通过高三位来弄个表格。
周期状态 | 高三位表示 | value |
---|
RUNNING | 111 | -1<<29 =-536870912 | SHUTDOWN | 000 | 0<<29 =0 | STOP | 001 | 1<<29=536870912 | TIDYING | 010 | 2<<29=1073741824 | TERMINATED | 011 | 3<<29=1610612736 |
六、源码解析
1)execute
① 源码解析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
② 简单图解说明
③ 文字描述法
文字理解来说,在执行execute()方法时如果状态一直是RUNNING时的执行过程如下: (状态必须是RUNNING,工作运行状态,才接收任务哈) 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务; 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中; 如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务; 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。 这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。 所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
④ 详细图解法
⑤ 生活实例讲解线程池的一个流程
其实最实际的例子就是目前互联网公司,比如我们公司有一个自研的任务,我目前需要5个正式员工来进行任务开发,计划一周交公。当产品来任务后,会优先交给5个正式员工安排,然后如果手里有任务,会将任务放到这一周的其他天数里,当7天的安排都安排满了,得再想办法,那就得考虑外包人员了,来辅助完成任务,但是外包人员预支公司也是有限制的,所以当外包人员工作也满了,那就只能考虑拒绝任务了。
2)reject 拒绝策略流程
private volatile RejectedExecutionHandler handler; 接口有四个实现类: 四个实现类都是ThreadPoolExecutor里的静态内部类
① 为何加volatile关键字?
因为在执行过程中有可能会重新设置拒绝策略,要保证可见性
② 默认的是:AbortPolicy,直接往外抛异常操作
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
③ CallerRunsPolicy
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
我们看if(!e.isShutdown())判断当前线程池是否关闭?如果还没有关闭,则用当前提交任务的线程去执行。 怎么理解呢?就好比,你是一个秘书,当前手里有一个任务需要交给领导去处理,结果领导都好忙,领导就委托给你,让你处理了。哈哈。
④ DiscardOldestPolicy
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
我们看e.getQueue().poll(); 是丢弃老的任务,然后执行新进来的任务。
3)addWorker方法(创建线程,招聘的概念)
addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
t.start();其实就是调用的worker的run方法,因为传入的是自己本身,this
4)Worker类 (招聘的员工)
Class Worker 主要维护线程运行任务的中断控制状态,以及其他次要的簿记。 此类机会主义地扩展 AbstractQueuedSynchronizer 以简化获取和释放围绕每个任务执行的锁。 这可以防止旨在唤醒等待任务的工作线程而不是中断正在运行的任务的中断。 我们实现了一个简单的不可重入互斥锁而不是使用 ReentrantLock,因为我们不希望工作任务在调用 setCorePoolSize 等池控制方法时能够重新获取锁。 此外,为了在线程真正开始运行任务之前抑制中断,我们将锁定状态初始化为负值,并在启动时将其清除(在 runWorker 中)。
简单理解: Worker类继承了AQS,并实现了Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
① 构造方法
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
在addWorker中,调用构造方法时,需要把任务传入,这里通过 getThreadFactory().newThread(this); 来新建一个线程, newThread方法传入的参数是 this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。 注意理解:创建线程的时候,传入的this,其实就是传入的员工自己,本身就是一个线程,得一直为公司工作
② firstTask和thread属性
firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
③ run方法
在addWorker中,如下,t.start其实就是调用的Worker里的run方法 if (workerAdded) { t.start(); workerStarted = true; }
public void run() {
runWorker(this);
}
runWorker(this);又再次把自己传入,调用的线程池的run,内部类调用外部类的实现。 为什么线程可重入?就是外部类runWorker,通过while循环判断,然后从阻塞队列获取任务。
while (task != null || (task = getTask()) != null) {
理解就是我公司五个正式员工,有的员工开发速度快,所以领取任务就快,阻塞队列(排期)空闲出来,就可以再往里加任务,这样不就可以少用外包人员了吗!给公司省钱啊,哈哈。
5)runWorker方法
① 源码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
② 执行过程中,某个任务异常了,剩下的任务还会执行吗?
public class ThreadPoolExecutorService {
private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(
1,
1,
10,
TimeUnit.SECONDS
, new ArrayBlockingQueue<>(5));
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
int finalI = i;
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println("task:" + Thread.currentThread().getName());
if (finalI == 0) {
throw new RuntimeException("error");
}
}
}, i);
}
}
}
如上代码,我执行第一个任务的时候,因为某种原因抛异常了,剩下的任务还能执行吗?答案是可以的,代码就在runWorker里,
completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } 若走了processWorkerExit方法,那么上边的completedAbruptly = false;铁定是没走的,所以completedAbruptly = true,然后看processWorkerExit就会发现,继续走了addWorker方法去执行任务。
③ runWorker一个整体的执行流程
- while循环不断地通过getTask()方法获取任务;
- getTask()方法从阻塞队列中取任务;
- 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
- 调用task.run()执行任务;
- 如果task为null则跳出循环,执行processWorkerExit()方法;
- runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly 变 量 来 表 示 在 执 行 任 务 过 程 中 是 否 出 现 了 异 常 , 在 processWorkerExit方法中会对该变量的值进行判断。
6)getTask方法
private Runnable getTask() {
boolean timedOut = false;
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :阻塞队列,当队列为空,条件不满足,被阻塞。想想什么意思呢?阻塞队列都为空了,那么就是没有多少任务可干了,那就只能各回各家各找各妈哈。
七、总结
1)整体流程图解
2)使用线程池的一些注意
1、创建的线程并没有标明哪个是核心线程,反正都是员工,能干活就行。只是通过设定的数量来判断的。 2、在使用的过程中,如果不想任务被丢弃,那就可以考虑选择一个无界队列,其实就是咱们说的,一周时间不够,延长开发时间啊,哈哈哈。 3、关于任务的丢弃问题,真的得根据业务进行选择,可以采用中间件来辅助,进行重写拒绝策略,实时监控阻塞队列的使用情况,必要可以重新放回队列进行执行。 4、线程池合理的线程数你是如何考虑的?
CPU核数= Runtime.getRuntime().availableProcessors() 第一种:CPU密集型( 大量计算,cpu 占用越接近 100%, 耗费多个核或多台机器):核心线程数 = CPU核数 + 1 第二种:IO密集型(大量网络,文件操作):核心线程数 = CPU核数 * 2 具体业务具体分析,比如现在CPU是10%,代码一执行,CPU狂升,那也就是CPU密集型 分为CPU,内存,磁盘,使用线程池,是想更多的利用起CPU。
|