线程池
好处
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:使用线程池可以对线程进行统一的分配、调优和监控,避免线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
- 解耦:线程池将任务的提交和任务的执行进行解耦,用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象,将任务的运行逻辑提交到Executor中,由Executor完成线程的调配和任务的执行部分;同时线程池在内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不直接关联,从而良好的缓冲任务,复用线程。
生命周期
线程池内部使用一个AtomicInteger变量clt维护两个值:运行状态(runState)和线程数量 (workerCount),高3位保存runState,低29位保存workerCount。
线程池的五种运行状态:
- RUNNING:能够接收新任务,以及对已添加的任务进行处理。
- SHUTDOWN:不接收新任务,但能处理已添加的任务,并且会去中断所有没有正在执行任务的线程。
- STOP:不接收新任务,不处理已添加的任务,并且会尝试中断正在处理任务或暂停任务的线程。
- TIDYING:所有的任务都已经终止,workerCount为0。
- TERMINATED:在terminated()方法执行完以后进入该状态。
任务调度
当用户向线程池提交一个任务后,执行过程如下:
- 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。
- 如果workerCount < corePoolSize,则创建并启动一个线程(充当核心线程)来执行新提交的任务,即使此时有其它线程空闲,但还是会去创建一个新的线程。
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。
- 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
- 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列,这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空;当队列满时,存储元素的线程会等待队列可用。
- ArrayBlockingQueue:数组实现的有界阻塞队列,按照先进先出FIFO的顺序来对元素进行排序,支持公平和非公平(默认)的方式来访问队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,队列的默认和最大的长度为Integer.MAX_VALUE,按照先进先出FIFO的顺序来对元素进行排序。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列,支持延时获取元素。
- SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作必须等待一个一个take操作,否则不能继续添加元素,支持公平和非公平(默认)的方式来访问队列。
- LinkedTransferQueue:由链表结构组成的无界阻塞队列,相比于其它的阻塞队列,多了tryTransfer和transfer方法。transfer方法:如果当前由消费者正在等待接收元素则可以把生产者传入的元素立刻传给消费者,如果没有则会将元素添加到队列尾部并等待到被消费者消费了才会返回。tryTransfer方法:尝试将生产者传入的元素直接传给等待的消费者则直接返回false,该方法也支持等待一段指定的时间。
- LinkedBlockingDeque:由链表结构组成的双向阻塞队列,在多线程同时入队时可以减小竞争。
实现原理:通过查看ArrayBlockingQueue的源码可知阻塞队列的实现是基于等待通知模式来实现的,其内部有一个ReentrantLock以及属于该Lock的两个Condition等待队列,在进行put和take操作时都会去加锁,当队列满时put或者是队列空时take的线程会分别在两个Condition上等待,直到消费者消费一个或者是生产者创建一个元素时唤醒等待线程。
线程池的创建
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
workQueue, threadFactory, handler)
- corePoolSize:核心线程数量,会一直存在,不会被回收。
- maximumPoolSize:线程池允许的最大线程池数量。
- keepAliveTime:数量超过corePoolSize的线程的最大空闲时间,超过该时间后会回收非核心线程。
- unit:超时时间的单位。
- workQueue:任务队列,用于保存等待被执行的任务的阻塞队列。
- threadFactory:创建线程的工厂类。
- handler:拒绝策略,当线程池和队列都满了时处理新提交的任务的策略,默认是丢弃任务并抛出异常。
拒绝策略
-
AbortPolicy策略:该策略会直接抛出异常。 -
CallerRunsPolicy策略:使用调用者所在线程(提交任务的线程)来运行任务。 -
DiscardOleddestPolicy策略:该策略将丢弃最老的一个任务,也就是即将被执行的任务,并尝试再次提交当前任务。 -
DiscardPolicy策略:该策略默默的丢弃无法处理的任务,不予任何处理。
参数配置策略
策略一:CPU密集型任务的线程数配置的少点,如CPU核数+1;IO密集型任务的线程数多点,如2*CPU核数。
策略二:线程池的大小=CPU核数×CPU利用率×(1+W/C),其中W/C为等待时间和计算时间的比率。
提交任务
- execute(Runnable command):用于提交不需要返回值的任务。
- submit(task):用于提交需要返回值的任务,该方法会返回一个Future类型的对象,通过调用get()方法可以阻塞的获取返回的结果。
线程管理
线程池创建线程时,会将线程封装成工作线程Worker,Worker继承自AQS,使用AQS实现了不可重入的独占锁的功能。同时Worker实现了Runnable接口,它的run方法的主要逻辑如下:
try {
while (task != null || (task = getTask()) != null) {
}
} finally {
processWorkerExit(w, completedAbruptly);
}
线程池内部使用一个Hash表(HashSet)来持有所有线程的引用,这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期。注意:线程池会将长时间未运行的空闲线程的引用从Hash表中移除,但是具体的回收任务是由GC完成的。
如何判断一个线程是否是空闲的?可以使用tryLock方法来判断线程池中的线程是否是空闲状态!lock方法一旦获取了独占锁,表示当前线程正在执行任务中,则不应该中断线程;如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。
常见线程池
- newFixedThreadPool:线程数是固定的,核心线程数=最大线程数=传入的参数n;keepAliveTime为0;阻塞队列为LinkedBlockingQueue,队列大小为Integer.MAX_VALUE。
- newSingleThreadExecutor:核心线程数=最大线程数=1,单线程串行处理所有任务;阻塞队列为LinkedBlockingQueue,队列大小为Integer.MAX_VALUE。
- newCachedThreadPool:核心线程数为0,最大线程数为Integer.MAX_VALUE,阻塞队列使用的是SynchronousQueue,所有的线程都直接去队列里拿任务,如果在超时时间内拿到任务就执行,否则该线程被回收。如果提交任务的速度大于线程池的处理速度则会不断地去创建线程。
- newScheduledThreadPool(corePoolSize):支持定时任务和周期性任务,核心线程数为corePoolSize,最大线程数为Integer.MAX_VALUE,阻塞队列为DelayQueue。调用scheduleWithFixedDelay方法提交定时任务,调用scheduleAtFixedRate方法提交周期性任务,而DelayQueue内部封装了一个优先级队列,他会对队列里的任务根据time进行排序,time小的排在前面先被执行。对于周期性任务,执行线程从队列中取出任务并执行后会修改下一次的执行时间,然后重新放回到队列里。
|