ThreadPool 线程池
游泳池
三大方法:
- newSingleThreadExecutor(常用)(单个)–>一次只允许一个人下水游泳
- newFixedThreadPool(常用)(固定)–>一次只允许固定数目的人下水游泳
- newCachedThreadPool(常用)(可扩容)–>所有人都可以下水游泳
七大参数:
- corePoolSize -->游泳池最佳游泳人数,不会产生拥挤
- maximumPoolSize–>游泳池最多可容忍的人数
- keepAliveTime–>当游泳池的人数超过最佳游泳人数。其他人的等待的时间,及这些人多就会离开
- unit,–>等待时间的单位,可以是毫秒、秒、分钟、小时和天,等等
- workQueue–>等待队列,游泳池中的人数超过最佳游泳人数时,想游泳的人在旁边排队
- threadFactory–>泳池管理者,可以管理进入游泳池的人数
- handler–>拒绝策略
四大策略:
- CallerRunsPolicy -->直接将此人交给售票处,不做任何处理
- AbortPolicy -->直接抛出RejectedExecutionException异常,表示游泳池人数满了
- DiscardPolicy–>任务直接丢弃,不做任何处理。表示直接不受理此人,
- DiscardOldestPolicy -->丢弃队列里最旧的那个任务,再尝试执行当前任务,指的是从排队中选择一个排队时间最久的丢弃,在受理此人
线程池简介(三大方法,七大参数,四大策略)
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池,本质上是一种对象池,用于管理线程资源。 在任务执行前,需要从线程池中拿出线程来执行。 在任务执行完成之后,需要把线程放回线程池。 通过线程的这种反复利用机制,可以有效地避免直接创建线程所带来的坏处。
线程池的处理流程: 线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
优点- -
? 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。
? 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。
? 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
缺点- -
- 频繁的线程创建和销毁会占用更多的CPU和内存
- 频繁的线程创建和销毁会对GC产生比较大的压力
- 线程太多,线程切换带来的开销将不可忽视
- 线程太少,多核CPU得不到充分利用,是一种浪费
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor 这几个类。
Executor类
当一个任务提交至线程池之后:
1.线程池首先当前运行的线程数量是否少于corePoolSize。如果是,则创建一个新的工作线程来执行任务。如果都在执行任务,则进入2. 2.判断BlockingQueue是否已经满了,倘若还没有满,则将线程放入BlockingQueue。否则进入 3. 如果创建一个新的工作线程将使当前运行的线程数量超过maximumPoolSize,则交给RejectedExecutionHandler来处理任务。
线程池参数说明
? corePoolSize :线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
? maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue.
- CPU密集型
- IO密集型
? keepAliveTime: 线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止;
? unit 存活的时间单位
? workQueue 存放提交但未执行任务的队列 (阻塞队列)
? threadFactory :创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactory
? handler :线程池的拒绝策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务
线程池中,有三个重要的参数,决定影响了拒绝策略:
- corePoolSize - 核心线程数,也即最小的线程数。
- workQueue - 阻塞队列 。
- maximumPoolSize - 最大线程数
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
总结起来,也就是一句话,当提交的任务数大于(workQueue.size() +maximumPoolSize ),就会触发线程池的拒绝策略。
拒绝策略(重点)
步骤:
-
在创建了线程池后,线程池中的线程数为零 -
当调用 execute()方法添加一个请求任务时,线程池会做出如下判断: 2.1 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务; 2.2 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列; 2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务; 2.4 如果队列满了且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。 -
当一个线程完成任务时,它会从队列中取下一个任务来执行 -
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断: 4.1 如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。 4.2 所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
--------------------------------------------><---------------------------------------------------
CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
AbortPolicy: 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
DiscardPolicy: 直接丢弃,其他啥都没有
DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
Executors
Executors是一个线程池工厂,提供了很多的工厂方法
// 创建单一线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创建带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创建定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创建流式(fork-join)线程池
public static ExecutorService newWorkStealingPool();
线程池的方法与创建
newCachedThreadPool(常用)(可扩容)
作用:创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程.
特点:
? 线程池中数量没有固定,可达到最大值(Interger. MAX_VALUE)
? 线程池中的线程可进行缓存重复利用和回收(回收默认时间为 1 分钟)
? 当线程池中,没有可用线程,会重新创建一个线程
场景: 适用于创建一个可无限扩大的线程池,服务器负载压力较轻,执行时间较短,任务多的场景
public class MyThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println("thread id is: " + Thread.currentThread().getId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
newFixedThreadPool(常用)(固定)
作用:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。
特征:
? 线程池中的线程处于一定的量,可以很好的控制线程的并发量
? 线程可以重复被使用,在显示关闭之前,都将一直存在
? 超出一定量的线程被提交时候需在队列中等待
场景: 适用于可以预测线程数量的业务中,或者服务器负载较重,对线程数有严格限制的场景
public class MyThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println("thread id is: " + Thread.currentThread().getId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
newSingleThreadExecutor(常用)(单个)
作用:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。与其他等效的 newFixedThreadPool 不同,可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。
特征:
线程池中最多执行 1 个线程,之后提交的线程活动将会排在队列中以此 执行
场景: 适用于需要保证顺序执行各个任务,并且在任意时间点,不会同时有多个线程的场景
public class MyThreadPool {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
System.out.println("thread id is: " + Thread.currentThread().getId());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
线程池的提交方式
往线程池中提交任务,主要有两种方法,execute()和submit()。
execute()
package java.util.concurrent;
public interface Executor {
/*
在将来的某个时间执行给定的命令。 该命令可以在一个新线程,一个合并的线程中或在调用线程中执行,由Executor实现。
*/
void execute(Runnable command);
}
execute()用于提交不需要返回结果的任务
public class MyThreadPool {
public static void main(String[] args){
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
System.out.println("我想返回结果");
});
}
}
submit()
package java.util.concurrent;
public interface ExecutorService extends Executor {
/*
提交值返回任务以执行,并返回代表任务待处理结果的Future。 未来的get方法将在成功完成后返回任务的结果。
如果您想立即阻止等待任务,您可以使用result = exec.submit(aCallable).get();格式的result = exec.submit(aCallable).get();
*/
<T> Future<T> submit(Callable<T> task);
/*
提交一个可运行的任务执行,并返回一个表示该任务的未来。 未来的get方法将返回null 成功完成时。
*/
<T> Future<T> submit(Runnable task, T result);
/*
提交一个可运行的任务执行,并返回一个表示该任务的未来。 未来的get方法将在成功完成后返回给定的结果。
*/
Future<?> submit(Runnable task);
}
submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException。
public class MyThreadPool {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor1 = Executors.newSingleThreadExecutor();
Future<Long> future = executor1.submit(() -> {
System.out.println("我会返回结果,快用我");
return System.currentTimeMillis();
});
System.out.println("task execute time is: " + future.get());
}
}
线程池的关闭方式
在线程池使用完成之后,我们需要对线程池中的资源进行释放操作。 ExecutorService类下有两个关闭缓冲池的方法。
public interface ExecutorService extends Executor {
/*
启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。
如果已经关闭,调用没有额外的作用。 此方法不等待以前提交的任务完成执行。
使用awaitTermination做到这一点。
*/
void shutdown();
/*
尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。
此方法不等待主动执行的任务终止。 使用awaitTermination做到这一点。
除了努力尝试停止处理积极执行任务之外,没有任何保证。
例如,典型的实现将通过Thread.interrupt()取消,所以无法响应中断的任何任务永远不会终止。
*/
List<Runnable> shutdownNow();
}
shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。 shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。 另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()和isTerminated,分别表示是否关闭和是否终止。
public interface ExecutorService extends Executor {
/*
如果这个执行者已被关闭,则返回 true 。
*/
boolean isShutdown();
/*
如果所有任务在关闭后完成,则返回true 。
请注意, isTerminated从不true ,除非shutdown或shutdownNow首先被调用。
*/
boolean isTerminated();
}
|