目录
一、线程池优点
二、线程池原理
创建线程池的方式
ThreadPoolExecutor参数说明
队列说明
线程池的执行流程
线程池拒绝策略
三、四种线程池解析
Executors.newSingleThreadExecutor();
Executors.newFixedThreadPool
Executors.newCachedThreadPool()
Executors.newScheduledThreadPool
四、自定义线程池
五、如何确定线程池线程数
一、线程池优点
1、降低资源消耗:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统稳定性,通过重复利用已创建的线程可以降低线程的创建和销毁造成的消耗。
2、提高响应速度:任务到达时,无需等待线程创建即可立即执行。
3、提高线程的可管理性:线程池提供了一种限制、管理资源的策略,维护一些基本的线程统计信息,如已完成任务量等,通过线程池可以对线程资源进行资源统一分配、监控和调优。
二、线程池原理
创建线程池的方式
- ?Executors.newCachedThreadPool(); 可缓存线程池
- ?Executors.newFixedThreadPool();固定数量的线程池
- ?Executors.newScheduledThreadPool() ; 定时执行的线程池
- ?Executors.newSingleThreadExecutor(); 单线程线程池
线程池的创建有四种方式,但是底层都是使用了ThreadPoolExecutor构造函数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor参数说明
corePoolSize--------------核心线程数,必须大于等于0。
?maximumPoolSize--------------最大线程数,必须大于核心线程数。
keepAliveTime---------------------非核心线程存活的时间,当非核心线程在空闲时间超过这个值之后,会关闭非核心线程。
TimeUnit------------------------------时间单位。纳秒,微秒,毫秒,秒,分,小时,天TimeUnit.NANOSECONDS、TimeUnit.MICROSECONDS、TimeUnit.MILLISECONDS、TimeUnit.SECONDS、TimeUnit.MINUTES、TimeUnit.HOURS、TimeUnit.DAYS
BlockingQueue<Runnable> workQueue--------------存放线程的阻塞队列。
RejectedExecutionHandler handler----------------------线程池拒绝策略的处理类
队列说明
- 1.ArrayBlockingQueue:有界队列,基于数组结构,按照队列FIFO原则对元素排序;
- 2.LinkedBlockingQueue:无界队列,基于链表结构,按照队列FIFO原则对元素排序,Executors.newFixedThreadPool()使用了这个队列; 无界默认是Integer.MAX_VALUE,有界则是 可以自己定义。
- 3.SynchronousQueue:同步队列,该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直被阻塞,Executors.newCachedThreadPool()使用了这个队列;
- 4.PriorityBlockingQueue:优先级队列,具有优先级的无限阻塞队列。
以ArrayBlockingQueue为例如下
package com.xiaojie.juc.thread.pool;
import java.util.concurrent.*;
/**
* @author xiaojie
* @version 1.0
* @description: 缓存功能的线程池
* @date 2021/12/12 20:31
*/
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ArrayBlockingQueue<String> arrayBlockingQueue = new ArrayBlockingQueue<String>(5, true);
arrayBlockingQueue.offer("a");
arrayBlockingQueue.offer("b");
arrayBlockingQueue.offer("c");
arrayBlockingQueue.offer("d");
arrayBlockingQueue.offer("e");
System.out.println(arrayBlockingQueue.size());
System.out.println(arrayBlockingQueue.poll());//从队列中取值之后,删除数据
System.out.println(arrayBlockingQueue.size());
System.out.println(arrayBlockingQueue.peek());//取值后不删除数据
System.out.println(arrayBlockingQueue.size());
}
}
线程池的执行流程
- ?如果当前工作线程数小于核心线程数,执行器总是优先创建一个新的线程,而不是从线程队列中获取一个空闲线程。
- 如果线程池中的总任务数量大于核心线程数量,新接手的任务将会存入阻塞队列,一直到阻塞队列满为止。在核心线程已用完,而阻塞队列未满的情况下线程池不会创建新线程,而是复用核心线程。
- 当完成一个任务时,执行器优先从阻塞队列中获取下一个任务开始执行,一直到阻塞队列为空,其中所有的缓存任务被取光。
- 在核心线程已经用完并且阻塞队列也已经满了的情况下,如果线程池接收新的任务,将会为新任务创建一个新的线程(非核心线程),并且会立即执行新任务。
- 在核心线程用完,阻塞队列已满,一直会创建新的线程直到线程池中的线程总数超过最大线程数。如果超过最大线程数,线程池就会拒绝接收新任务,当新任务到来时,执行拒绝策略。
线程池拒绝策略
两种情况会拒绝处理任务: 1.当线程数已经达到maxPoolSize,并且队列已满,会拒绝新任务。 2.当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。
线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置,默认是AbortPolicy,会抛出异常。 ThreadPoolExecutor类有几个内部实现类来处理拒绝任务:
- 1.AbortPolicy 丢弃任务,抛运行时异常
- 2.CallerRunsPolicy 执行任务
- 3.DiscardPolicy 忽视,什么都不会发生
- 4.DiscardOldestPolicy 从队列中踢出最先进入队列的任务
- 5.实现RejectedExecutionHandler接口,可自定义处理器
三、四种线程池解析
Executors.newSingleThreadExecutor();
package com.xiaojie.juc.thread.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author xiaojie
* @version 1.0
* @description: 单线程线程池
* @date 2021/12/12 22:14
*/
public class SingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i=0;i<5;i++){
int finalI = i;
executorService.execute(() -> {
try {
Thread.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("这是一个单线程线程池的demo,线程名称:"+Thread.currentThread().getName()+">>>>>>>"+ finalI);
});
}
//关闭线程池
executorService.shutdown();
}
}
?
?由执行结果可知
1、单线程线程池中的任务是按照提交任务的顺序执行的。
2、池中唯一的线程存活时间是无限制的。
3、当池中的线程正在繁忙时,新提交的任务会进入内部阻塞队列,并且阻塞队列是无界的(LinkedBlockingQueue<Runnable>)。
适用场景
单线程线程池适用于任务按照提交次序,一个任务一个任务的逐个执行的场景。
Executors.newFixedThreadPool
package com.xiaojie.juc.thread.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author xiaojie
* @version 1.0
* @description: 固定长度的线程池
* @date 2021/12/12 22:29
*/
public class FixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("这是一个固定长度的线程池的demo,线程名称:" + Thread.currentThread().getName() + ">>>>>>>" + finalI);
}
});
}
executorService.shutdown();
}
}
?
?有执行结果可知
1、并不是按照任务提交顺序执行的。
2、如果线程数量没有达到固定数量,每次提交都会创建新的线程,直到达到最大数量
3、如果线程洗的大小达到固定数量就会保持不变,如果某个线程因为异常而结束,那么线程池会补充一个新的线程。
4、如果接收到新任务没有空闲线程也会进入阻塞队列(LinkedBlockingQueue<Runnable>)。
适用场景:需要任务长期执行的场景,固定数量的线程数能够避免频繁的创建和销毁线程,例如CPU密集型的任务,在CPU被工作线程长时间占用的情况下,能确保尽可能减少线程分配。
弊端:
内部使用无界队列来存放任务,当大量任务超过线程池能处理的最大容量时队列无限增大,使服务器资源迅速耗尽。
Executors.newCachedThreadPool()
package com.xiaojie.juc.thread.pool;
import java.util.concurrent.*;
/**
* @author xiaojie
* @version 1.0
* @description: 缓存功能的线程池
* @date 2021/12/12 20:31
*/
public class CachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
int finalI = i;
executorService.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("这是一个缓存功能的线程池的demo,线程名称:" + Thread.currentThread().getName() + ">>>>>>>" + finalI);
}
});
}
executorService.shutdown();
}
}
?
?执行结果可知
1、当任务提交时,如果线程繁忙,会创建新的线程执行任务。
2、对线程池的大小没有限制,底层使用SynchronousQueue<Runnable>队列。
3、如果部分线程空闲,线程数量超过了任务数量,就会回收空闲(60秒不执行任务)线程。
适用场景
需要快速处理突发性强,耗时较短的任务场景,例如Netty的NIO场景,RESTAPI瞬时削峰。可缓存线程池的线程数量不固定,有空闲线程就会自动回收,接收到新任务时判断是否有空闲线程,如果没有就直接创建新的线程。
弊端
线程池没有最大线程数量限制,如果大量的异步任务同时执行,可能会因创建线程过多而导致资源耗尽。
Executors.newScheduledThreadPool
package com.xiaojie.juc.thread.pool;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author xiaojie
* @version 1.0
* @description: 定时,延迟线程池
* @date 2021/12/12 23:08
*/
public class ScheduledThreadPoolDemo {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i++) {
int finalI = i;
scheduledExecutorService.schedule(() -> {
System.out.println("这是一个延迟线程池的demo,延迟5秒后执行,线程名称:" + Thread.currentThread().getName() + ">>>>>>>" + finalI);
}, 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("这个方法延迟1秒后执行,然后每隔2秒后重复执行");
}
}, 1, 1, TimeUnit.SECONDS);
}
// scheduledExecutorService.shutdown();
}
}
?使用?DelayedWorkQueue()队列实现
?适用场景
周期性的执行任务的场景,例如一些定时任务的实现,Springboot的任务调度。
四、自定义线程池
Executors创建线程的潜在问题
1、创建newFixedThreadPool的潜在问题在于工作队列,使用LinkedBlockingQueue(无界队列),如果任务的提交速度大于任务的处理速度,就会造成大量的任务在阻塞队列中等待,如果阻塞队列很大,很有可能导致OOM(内存溢出)。
2、创建newSingleThreadExecutor和newFixedThreadPool线程池一样,同样使用LinkedBlockingQueue(无界队列),如果任务的提交速度大于任务的处理速度,就会造成大量的任务在阻塞队列中等待,如果阻塞队列很大,很有可能导致OOM(内存溢出)。
3、newCachedThreadPool线程池的潜在问题在于其核心线程数为0,最大线程数为Integer.MAX_VALUE,使用SynchronousQueue同步队列。如果同时执行大量的任务,就意味会创建大量的线程,可能导致OOM,甚至导致CPU资源耗尽。
4、ScheduledThreadPoolExecutor最大线程数也是Integer.MAX_VALUE,和newCachedThreadPool存在同样的问题。
自己创建线程池
package com.xiaojie.juc.thread.pool;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author xiaojie
* @version 1.0
* @description: 自定义创建线程池
* @date 2021/12/12 23:56
*/
public class MyThreadPool {
//定义工作队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(10);
public ThreadPoolExecutor threadPoolExecutor(int corePoolSize, int maximumPoolSize, Long keepAliveTime) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
0L, TimeUnit.MILLISECONDS,
workQueue);
}
public static void main(String[] args) {
MyThreadPool myThreadPool = new MyThreadPool();
ThreadPoolExecutor executor = myThreadPool.threadPoolExecutor(3, 10, 60L);
for (int i = 0; i < 20; i++) {
executor.execute(() -> {
//最大可以允许20个任务,超过的将进行拒绝策略
System.out.println("通过ThreadPoolExecutor 定义的线程池" + Thread.currentThread().getName());
});
}
}
}
五、如何确定线程池线程数
1、由于IO密集型任务的CPU使用率低,导致线程空闲时间很多,因此通常需要开CPU 核心数两倍的线程。当IO线程空闲时,可以启用其他线程继续使用CPU,来提高CPU的利用率。
2、如果是CPU密集型,CPU密集型的任务虽然可以并行的执行,但是并行的任务越多,花在线程切换的时间就越多,CPU执行效率就越低,所以一般设置线程数等于CPU的核心数。
3、混合型既要满足IO又要满足CPU密集的计算公式
最佳线程数=((线程等待时间+线程CPU时间)/线程CPU时间)*CPU核数
参考:《JAVA高并发核心编程(卷2):多线程、锁、JMM、JUC、高并发设计》-尼恩编著
|