1.线程池的介绍
1.1 线程池的概念
线程池:Java中开辟出了一种管理线程的概念,这个概念叫做线程池,使用池的技术可以降低资源消耗(常见的池技术有jdbc连接池,线程池,内存池,对象池)。
1.2 为什么要提出线程池的概念
程序运行的本质是cpu进程的调度(占用系统的资源)进程是一个动态的过程,是一个活动的实体。简单来说,一个应用程序的运行就可以被看做是一个进程,而线程,是运行中的实际的任务执行者。可以说,进程中包含了多个可以同时运行的线程。而在Java中,内存资源是极其宝贵的,所以,我们就提出了线程池的概念。
1.3 线程池的好处
(1)降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
(2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
2. 线程池的使用
2.1 线程池的创建
我们可以通过ThreadPoolExecutor来创建一个线程池。 ThreadPoolExecutor类是线程池的核心实现类,用来执行被提交的任务。其继承关系如下。
下面我们来分析一下ThreadPoolExecutor类我们来看看ThreadPoolExecutor类的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
从源码中我们发现ThreadPoolExecutor有四个构造方法,所以有四种创建方法,在四个构造函数中一共有七个参数下面来对七个参数进行说明。
2.2 线程池的七大参数
(1)、corePoolSize(核心线程数,必需指定):
线程池的基本大小。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于基本线程时就不再创建。如果调用了线程池的prestartAllCoreThreads() 方法,线程池会提前将核心线程创建并启动。默认情况下,核心线程会一直存活,但是当将 allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
(2)、maximumPoolSize(必需指定,线程池最大数量):
线程池所能容纳的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会创建新的线程执行任务。不过要注意,如果使用了无界的任务队列这个参数则没有任何效果,因为线程创建要等到队列满了才会创建。
(3)、keepAliveTime(必需指定,线程活动保持的最长时间):
线程池的工作线程空闲后,保持的存活时间,所以如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。如果超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout 设置为 true 时,核心线程也会超时回收。
(4)、TimeUnit(必需指定):
指定keepAliveTime 参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。
(5)、BlockingQueue (任务队列,必需指定):
用于保存等待执行的任务的阻塞队列,通过线程池的 execute() 方法提交的 Runnable 对象将存储在该参数中。
阻塞队列的阻塞分两种情况:
- 当队列满的时候去写会阻塞
- 当队列为空的时候去读也会阻塞
BlockingQueue:四组API
功能 | 抛出异常 | 不抛出异常且有返回值 | 阻塞等待 | 超时等待 |
---|
添加 | add() | offer() | put() | offer(…) | 移除 | remove() | poll() | take() | poll(…) | 检查队首元素 | element() | peek() | - | - |
public static void test1(){
System.out.println(queue.add("a"));
System.out.println(queue.add("b"));
System.out.println(queue.add("c"));
System.out.println(queue.add("d"));
System.out.println("---------------------");
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.remove());
System.out.println(queue.element());
}
public static void test2(){
System.out.println(queue.offer("d"));
System.out.println(queue.offer("d"));
System.out.println(queue.offer("d"));
System.out.println(queue.offer("d"));
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.poll());
System.out.println(queue.peek());
}
public static void test3()throws Exception{
queue.put("a");
queue.put("b");
queue.put("c");
queue.put("d");
queue.take();
queue.take();
queue.take();
queue.take();
}
public static void test4()throws InterruptedException{
System.out.println(queue.offer("d"));
System.out.println(queue.offer("d"));
System.out.println(queue.offer("d"));
System.out.println(queue.offer("d",2,TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS));
System.out.println(queue.poll(2, TimeUnit.SECONDS));
}
任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在 Java 中需要实现 BlockingQueue 接口。该接口的实现子类如下: ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列),此队列按照先进先出原则对元素进行排序。
LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,也是按照先进先出排序元素,吞吐量要高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool使用了这个队列。在未指明容量时,容量默认为 Integer.MAX_VALUE。
PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现 Comparable 接口也可以提供 Comparator 来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。
DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现 Delayed 接口,通过执行时延从队列中提取任务,时间没到任务取不出来。
SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用 take() 方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用 put() 方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。
public class TestSynchronousQueue {
public static void main(String[] args) {
BlockingQueue<String> queue=new SynchronousQueue<>();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" put a");
queue.put("a");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+" put b");
queue.put("b");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"product").start();
new Thread(()->{
try {
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"=>"+ queue.take());
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"=>"+ queue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
},"consumer").start();
}
}
结果:
product put a
consumer=>a
product put b
consumer=>b
LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样 FIFO(先进先出),也可以像栈一样 FILO(先进后出)。
LinkedTransferQueue: 它是ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的结合体,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行为一致,但是是无界的阻塞队列。
注意:
有界队列和无界队列的区别:如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置maximumPoolSize 没有任何意义。
(6)、ThreadFactory(线程工厂,可选):用于指定为线程池创建新线程的方式,还可以为每个创建出来的线程设置线程名。使用开源框架guava提供的ThreadFactoryBuilder可以快速的设置。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
(7)、 RejectedExecutionHandler(拒绝策略,可选):当达到最大线程数时需要执行的饱和策略。
当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现 RejectedExecutionHandler 接口,并实现 rejectedExecution(Runnable r, ThreadPoolExecutor executor) 方法。在ThreadPoolExecutor 类中实现了 4 种拒绝策略:
AbortPolicy(默认):丢弃任务并抛出 RejectedExecutionException 异常。
public class Demo1 {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 9; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
}
测试结果:
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-2
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task threadpool.demo2$$Lambda$14/0x0000000100066840@71be98f5 rejected from java.util.concurrent.ThreadPoolExecutor@6fadae5d[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 3]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
at threadpool.demo2.main(demo2.java:16)
CallerRunsPolicy:由调用线程处理该任务。
public class Demo2 {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 9; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
}
超过了由主线程来处理
pool-1-thread-1
pool-1-thread-3
main
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-2
pool-1-thread-4
pool-1-thread-5
DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。
public class Demo3 {
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy());
for (int i = 0; i < 9; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
}
丢掉了一个任务
pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-4
pool-1-thread-3
pool-1-thread-1
pool-1-thread-5
pool-1-thread-2
DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。优化版
public class Demo4{
public static void main(String[] args) {
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < 12; i++) {
poolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
}
}
pool-1-thread-1
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
下面看一张图来更好的理解线程池和总结一下这几个参数 由图,我们可以看出,线程池中的corePoolSize就是线程池中的核心线程数量,这几个核心线程,就算在没有用的时候,也不会被回收,maximumPoolSize就是线程池中可以容纳的最大线程的数量,而keepAliveTime,就是线程池中除了核心线程之外的其他的最长可以保留的时间,因为在线程池中,除了核心线程即使在无任务的情况下也不能被清除,其余的都是有存活时间的,意思就是非核心线程可以保留的最长的空闲时间,而unit,就是计算这个时间的一个单位,workQueue,就是等待队列,任务可以储存在任务队列中等待被执行,执行的是FIFIO原则(先进先出)。threadFactory,就是创建线程的线程工厂,最后一个handler,是一种拒绝策略,我们可以在任务满了之后,拒绝执行某些任务。
2.3 如何向线程池提交任务
可以使用两个方法向线程池中提交任务分别为execute()和submit()方法。
1.execute()方法
execute()方法用于提交不需要返回值的任务,由源码知道是执行Runnable任务,所以无法判断任务是否被线程池执行成功。
2.submit方法
submit方法用于提交一个需要返回值的任务。线程池会返回一个future类型的对象,通过这个对象我们可以判断任务是否执行成功,并且可以通过future对象的get方法来获取返回值。
2.4 如何关闭线程池
我们可以通过调用线程池的shutdown或者shutdownNow方法来关闭线程池,这两个方法都是遍历线程池中的所有工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是他们有一定的区别
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
我们shutdown的源码可以知道shutdown只是将线程池的状态设置成了SHUTDOWN状态,然后中断所有没有在执行任务的线程。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并且返回等待执行任务的列表。
当所有的任务都关闭了才表示线程池关闭,我们可以调用isTerminaed方法来判断线程池是否关闭成功。我们通常调用shutdown来关闭线程池,如果任务不一定要求执行完就关闭线程池那么我们可以使用shutdownNow来关闭线程池。
2.5 如何合理的使用线程池
要合理的使用线程池我们就要知道如何合理地配置线程池和如何监控线程池,要合理地配置线程池我们就必须要分析任务的特性。
合理地配置线程池
我们可以从一下角度来分析:
- 任务的性质:cpu密集型任务,IO密集型任务和混合型任务
- 任务的优先级:高中低
- 任务的执行时间:长中短
- 任务的依赖性:是否依赖其他资源比如数据库连接
性质不同的任务可以用不同规模的线程池分开处理。
- CPU密集型任务应配置尽可能小的线程,如配置Ncpu+1个线程的线程池。
- 由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。
- 混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。
可以通过Runtime .getRuntime().availableProcessors() 方法获得当前设备的CPU个数。
- 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。
- 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。
- 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
线程池的监控
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便出现问题时快速定位查找问题。可以通过线程池提供的参数进行监控,在监控线程池的时候可以使用以下属性。
taskCount :线程池需要执行任务的数量。
completedTaskCount :线程池在运行过程中已完成的任务数量,小于或等于taskCount。
largestPoolSize :线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize :线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销毁,所以这个大小只增不减。
getActiveCount :获取活动的线程数。
3. 线程池的实现原理
当向一个线程池提交任务之后线程池是如何工作的呢?我们来看看线程池的主要处理流程。
线程池的处理流程如下: 由图分析,当提交一个新任务时,线程池的处理流程如下:
- 线程池判断核心线程池里的线程是否都在执行任务。如果不是则创建一个新的工作线程来完成任务。如果核心线程都在执行任务,则进入下一个执行流程。
- 线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里,如果工作队列也满了,则进入下一个执行流程。
- 线程池判断线程池的最大线程数是否已满。如果没有,则创建一个工作线程来执行任务。如果工作队列已满,则进入下一个执行流程。
- 线程池最大线程数也满了线程池则会安照执行策略对无法执行的任务进行处理。
ThreadPoolExecutor执行示意图: (1)、如果当前运行的线程少于corePoolSize ,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
(2)如果运行的线程等于或多于corePoolSize ,则将任务加入到BlockingQueue 。
(3)如果无法将任务加人到BlockingQueue (队列已满)则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4)如果创建新线程将使当前运行的线程超出maximumPoolSize ,任务将被拒绝,并调用RejectedExcutionHandler.rejectedExecution() 方法。
源码如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
在执行execute方法是调用了addWorker方法,下面我们来看看addWoreker方法的源码:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get();;) {
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int c = ctl.get();
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.getState() != Thread.State.NEW)
throw new IllegalThreadStateException();
workers.add(w);
workerAdded = true;
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
这个方法的源代码很长我们就简单的了解一下它的的参数和功能
接收的参数:
addWorker方法接收两个参数第一个参数就是我们要执行的Runnable任务对象,第二个是是否使用核心线程数作为边界,如果为false则将最大线程数作为边界。
功能:
addWorker方法的作用是检查是否可以根据当前池状态和给定的边界(core或maximum)添加新的worker。如果是,将相应地调整worker计数,如果可能,将创建并启动一个新的worker,并将firstTask也就是传进来的Runnable任务对象作为它的第一个任务运行。如果线程池已停止或符合关闭条件,则此方法返回false。如果线程工厂在请求时创建线程失败,它也返回false。如果线程创建失败,或者由于线程工厂返回null,或者由于异常(通常是thread .start()中的OutOfMemoryError),方法将会回滚。
工作线程:线程池创建线程时,会将线程封装成工作线程Worker. 我们在add Worker方法中可以看到
w = new Worker(firstTask);
final Thread t = w.thread;
Worker在执行行完任务后,还会循环获取工作队列里的任务来执行,在Worker类的run方法中调用 runWorker(this)方法我们可以在该方法中可以看出来。
public void run() {
runWorker(this);
}
runWorker方法的部分源码
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
4.四种常见线程池
4.1 SingleThreadPool
SingleThreadPool是只有一个线程的单一线程池。
在Executor框架中提供了两个静态方法用来创建SingleThreadPool线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
单一线程创建一个线程执行任务
ExecutorService threadPool = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 5; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
运行结果:
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的有界队列。
应用场景:适用于需要保证顺序的执行各个任务,并且在任意时间点,不会有多个线程的。不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。
4.2 FixedThreadPool
FixedThreadPool是一个固定数量的线程池。
在Executor框架中提供了两个静态方法用来创建FixedThreadPool线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
创建一个固定数量的线程池执行任务
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i = 1; i <= 5; i++) {
executorService.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
executorService.shutdown();
执行结果:
pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
pool-1-thread-5
特点:定长的线程池,有核心线程,核心线程的即为最大的线程数量,没有非核心线程。
应用场景:控制线程最大并发数,需要限制当前线程数量。
4.3 CachedThreadPool
CachedThreadPool是大小无界的线程池,遇强则强遇弱则弱,你要多少个线程我就给你创建多少个线程。
在Executor框架中提供了两个静态方法用来创建CachedThreadPool线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
遇强则强遇弱则弱
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 1; i <= 10; i++) {
executor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
executor.shutdown();
测试结果:
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-4
pool-1-thread-3
pool-1-thread-6
pool-1-thread-9
pool-1-thread-7
pool-1-thread-8
pool-1-thread-10
特点:可缓存的线程池,该线程池中没有核心线程,非核心线程的数量为Integer.max_value,就是无限大,当有需要时创建线程来执行任务,没有需要时回收线程。
应用场景:适用于耗时少,任务量大的情况。
4.4 ScheduledThreadPool
周期性执行任务的线程池,按照某种特定的计划执行线程中的任务,有核心线程,但也有非核心线程,非核心线程的大小也为无限大。适用于执行周期性的任务。
在Executor框架中提供了两个静态方法用来创建ScheduledThreadPool线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
ExecutorService executor = Executors.newScheduledThreadPool(3);
for (int i = 1; i <= 9; i++) {
executor.execute(()->{
System.out.println(Thread.currentThread().getName());
});
}
executor.shutdown();
}
四种线程池对比
类型 | 池内线程类型 | 池类线程数量 | 应用场景 |
---|
SingleThreadPool | 核心线程 | 一个 | 不需要处理线程同步问题 | ScheduledThreadPool | 核心线程&非核心线程 | 核心线程固定非核心线程无限制 | 执行定时/周期性任务 | CachedThreadPool | 非核心线程 | 不固定,可无限大 | 执行数量多耗时少 | FixedThreadPool | 核心线程 | 固定 | 控制线程最大并发数 |
总结
Executors 的 4 个功能线程池虽然方便,但现在已经不建议使用了,而是建议直接通过使用 ThreadPoolExecutor 的方式,这样的处理方式让写的程序员更加明确线程池的运行规则,规避资源耗尽的风险。
其中 Executors 的功能线程有如下弊端:
- FixedThreadPool 和 SingleThreadExecutor:主要问题是堆积的请求处理队列均采用 LinkedBlockingQueue,可能会耗费非常大的内存,甚至 OOM。
- CachedThreadPool:主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。
文章参考java并发编程的艺术
|