详细介绍了Executors线程池工具类的使用,以及四大内置线程池。
系列文章:
- Java Executor源码解析(1)—Executor执行框架的概述
- Java Executor源码解析(2)—ThreadPoolExecutor线程池的介绍和基本属性【一万字】
- Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】
- Java Executor源码解析(4)—ThreadPoolExecutor线程池submit方法以及FutureTask源码【一万字】
- Java Executor源码解析(5)—ThreadPoolExecutor线程池其他方法的源码
- Java Executor源码解析(6)—ScheduledThreadPoolExecutor调度线程池源码解析【一万字】
- Java Executor源码解析(7)—Executors线程池工厂以及四大内置线程池
Executors可以看作一个工具类,里面提供了好多静态方法,这些方法根据用户选择返回不同的内置线程池实例,或者返回线程工厂,或者将runnable转换为callable。
《阿里巴巴java开发手册》中不推荐使用Executors创建线程池,有这样的强制编程规约:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
Executors 返回的线程池对象的弊端如下:
- FixedThreadPool 和 SignalThreadPool : 允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
- CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
1 内置线程池
Executors提供了四种默认线程池实现!
1.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads)
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
corePoolSize与maximumPoolSize相等,即最大线程数就是核心线程数;keepAliveTime = 0 该参数默认对核心线程无效,因此不会超时,在线程池被关闭之前,池中被创建的线程将一直存在。
workQueue为LinkedBlockingQueue,是一个无界阻塞队列,队列容量为Integer.MAX_VALUE。如果任务提交速度持续大余任务处理速度,会造成队列堆积大量任务,很有可能在执行拒绝策略之前就造成内存溢出。
适用于持续不断地提交任务的场景,并且要求任务提交速度不得超过线程处理速度。
1.2 newCachedThreadPool
public static ExecutorService newCachedThreadPool()
创建对所有线程都带有超时时间的线程池。对于执行很多短期异步任务的程序而言,这个线程池通常可提高程序性能。
调用execute等方法将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的(均在工作中),则创建一个新线程并添加到池中。将会终止并移除那些已有60秒钟未工作的线程。因此,长时间保持空闲的线程池不会使用任何资源。线程池会根据执行的情况,在程序运行时自动调整线程数量,这里就是可变线程数量的线程池的特点。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize = 0,maximumPoolSize = Integer.MAX_VALUE,即没有核心线程,线程数量最大为Integer. MAX_VALUE;keepAliveTime = 60s,对所有的线程空闲60s后清理。
workQueue 为 SynchronousQueue 阻塞同步队列,该队列没有容量,因此如果有新的任务进来并且目前的线程都在工作中,那么会立即创建新线程执行任务;
适用于快速处理大量耗时较短的任务,如果任务耗时较长,极端情况下CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
1.3 newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。内部就是一个ScheduledThreadPoolExecutor实例!
public static ScheduledExecutorService
newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
1.4 newSignalThreadExecutor
public static ExecutorService newSingleThreadExecutor()
返回只有固定一个线程的线程池!
这里多了一层FinalizableDelegatedExecutorService包装,这一层有什么用呢?它和newFixedThreadPool“固定容量”的线程池有什么区别呢?是否可以轻易的改变线程数量呢?写个demo来测试一下:
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService fixedExecutorService = Executors.newFixedThreadPool(1);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) fixedExecutorService;
System.out.println(threadPoolExecutor.getCorePoolSize());
threadPoolExecutor.setCorePoolSize(8);
System.out.println(threadPoolExecutor.getCorePoolSize());
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor();
ThreadPoolExecutor threadPoolExecutor2 = (ThreadPoolExecutor) singleExecutorService;
}
}
为什么不能强转?原理很简单,FinalizableDelegatedExecutorService并没有和ThreadPoolExecutor产生继承关系。
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) {
e = executor;
}
}
2 默认线程工厂
public static ThreadFactory defaultThreadFactory()
Executors提供了一个默认的线程工厂实现!线程池中线程的默认名字的由来以及线程所属线程组等属性都是通过线程工厂设置的!
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
3 Runnable转换为Callable
public static < T > Callable< T > callable(Runnable task, T result)
返回 Callable 对象,调用它时可运行给定的任务并返回给定的结果。这在把需要 Callable 的方法应用到其他无结果的操作时很有用。
public static Callable< Object > callable(Runnable task)
返回 Callable 对象,调用它时可运行给定的任务并返回 null。
原理很简单,就是适配器模式的应用,返回的是一个RunnableAdapter适配器类的实例。
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
4 总结
本系列文章我们学习了Executor执行框架的大概架构,以及ThreadPoolExecutor、FutureTask、ScheduledThreadPoolExecutor 、Executors等核心类的原理。阅读线程池的实现源码可以让我们不仅知道某个方法是什么,还能知道为什么,以及怎么实现的,在这过程中我们还能接触到比如适配器模式,策略模式、Leader-Follower线程模型等一些偏理论的东西的具体应用。
如果你读完本系列文章,你应该可以知道一些关于线程池的不常见的知识,比如:
- 线程池的最大线程数量一定会小于等于maximumPoolSize吗?为什么?
- 核心线程可以应用keepAliveTime设置的超时时间吗?有例外吗?
- 线程池有用到锁吗?有几种锁?有什么用?
- 如果一个工作线程在执行任务过程中抛出了异常,那么这个线程会怎样呢?
- 延迟/周期任务的原理是什么?scheduleWithFixedDelay和scheduleWithFixedDelay 方法的区别是什么?
- 正常线程池中设置的延迟任务一定会在到达你设置的延迟时间之时运行吗?
- FutureTask的原理?
Executor执行框架类容很丰富,功能很多,本次仅仅讲解了一部分,还有一些包括ThreadPoolExecutor、ScheduledThreadPoolExecutor的某些方法也没有讲解,使用时建议查看相关api文档做更全面的了解!另外JDK1.7的时候线程池新增了ForkJoinPool分治框架,这是对线程池的增强,后面的文章我们会讲解ForkJoinPool的源码!
相关文章:
- AQS:JUC—五万字的AbstractQueuedSynchronizer(AQS)源码深度解析与应用案例
- interrupt:Java线程中断与停止线程详解以及案例演示
- ReentrantLock:JUC—ReentrantLock源码深度解析
- PriorityBlockingQueue:JUC—两万字的PriorityBlockingQueue源码深度解析
- DelayQueue:JUC—DelayQueue源码深度解析
- LockSupport:JUC—LockSupport以及park、unpark方法底层源码深度解析
其他JUC相关文章比如阻塞队列、锁等等,在我的专栏中都有!
如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!
|