Executor 框架用于任务的提交和执行解耦,任务的提交交给Runnable 或者Callable,而Executor框架用来处理任务。Executor 框架中最核心的成员变量是ThreadExecutor,它是线程池的核心实现类。
ThreadPoolExecutor 构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运行的线程数少于corePoolSize,则创建新线程来处理任务;如果等于或者多于coorPoolSize,则不会创建。如果调用线程池的prestartAllcoreThread 方法,线程池会提前创建并启动所有的核心线程来等待任务。
线程池允许创建的最大线程数。如果任务队列满了并且线程数小于maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务。
非核心线程闲置的超时时间,超过这个时间则回收。如果设置allowCoreThreadTimeOut属性为ture时,keepAliveTime也会应用到核心线程上。
keepAliveTime参数的时间单位。可选的单位有天,小时,分钟,秒,毫秒。
任务队列。如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务队列是BlockingQueue 类型的,也就是阻塞队列。
线程工厂。可以用线程工厂给每个创建处理的线程设置名字。一般情况下无须设置该参数。
饱和策略。这是当任务队列和线程池都满了时所采取的应对策略,默认是AbordPolicy,表示无法处理新任务,并抛出RejectExecutionException 异常。
- CallerRunsPolicy:用调用者所在的线程来处理任务。此策略提供简单的反馈控制,能够减缓新任务的提交速度。
- DiscardPolicy:不能执行的任务,并将该任务删除。
- DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。
线程池的生命周期
表示线程池处于运行状态,能够接受新提交的任务且能对已添加的任务进行处理。RUNNING状态是线程池的初始化状态,线程池一旦被创建就处于RUNNING状态。
线程处于关闭状态,不接受新任务,但可以处理已添加的任务。RUNNING状态的线程池调用shutdown后会进入SHUTDOWN状态。
线程池处于停止状态,不接收任务,不处理已添加的任务,且会中断正在执行任务的线程。RUNNING状态的线程池调用了shutdownNow后会进入STOP状态。
当所有任务已终止,且任务数量为0时,线程池会进入TIDYING。当线程池处于SHUTDOWN状态时,阻塞队列中的任务被执行完了,且线程池中没有正在执行的任务了,状态会由SHUTDOWN变为TIDYING。当线程处于STOP状态时,线程池中没有正在执行的任务时则会由STOP变为TIDYING。
线程终止状态。处于TIDYING状态的线程执行terminated()后进入TERMINATED状态。
线程池的处理流程和执行原理
处理流程
- 提交任务后,线程池先判断线程数是否达到了核心线程数,如果未达到核心线程数,则创建核心线程处理任务;否则,就执行下一步操作。
- 接着线程池判断任务队列是否满了。如果没满,则将任务添加到任务队列中;否则,就执行下一步操作。
- 接着因为任务队列满了,线程池就判断线程数是否达到了最大线程数。如果未达到,则创建非核心线程处理任务;否则,就执行饱和策略,默认会抛出RejectedExecutionException 异常。
执行原理
- 如果线程池中的线程数未到达核心线程数,则创建核心线程处理任务。
- 如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从任务队列中取出任务进行处理。
- 如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
- 如果线程数超过了最大线程数,则执行饱和策略。
线程池的种类
通过直接或间接地配置ThreadPoolExecutor的参数可以创建不同类型的ThreadPoolExecutor,其中有4种线程池比较常用,它们分别是FixThreadPool、CachedThradPool、SingleThreadExecutor和ScheduledThredPool。
FixThreadPool
通过Executors 的newFixedThreadPool 方法来创建。它是一种线程数量固定的线程池,当线程处于空闲状态时,它们并不会被回收,除非线程池被关闭了。当所有的线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来。由于FixThreadPool只有核心线程并且这些核心线程不会被回收,这意味着它能够更加快速地响应外界的请求。
实例
public class ThreadPoolDemo {
public static void main(String[] args) {
MRunable mRunable=new MRunable();
ExecutorService fixThreadPool=Executors.newFixedThreadPool(4);
fixThreadPool.execute(mRunable);
}
static class MRunable implements Runnable {
public void run() {
System.out.println("开始执行任务");
try {
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println("暂停执行2秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务结束");
}
}
}
源码分析
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixThreadPool的corePoolSize和maximumPoolSize都设置为创建FixThreadPool指定的参数nThreads,也就意味着FixedThredPool 只有核心线程,并且数量固定的,没有非核心线程。keepAliveTime设置为0L意味着多余的线程会被立即终止。任务队列采用了无界的阻塞队列LinkBlockingQueue。
FixThreadPool执行示意图
由图可知,当执行excute方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkBlockingQueue中。FixThreadPool 就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过corePoolSize时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行。
CachedThradPool
通过Executors 的newCachedThradPool 方法来创建。它是一种数量不定的线程池。它只有非核心线程,并且其最大线程数为Integer.MAX_VALUE。当线程池中的线程都处于活动的状态时,线程池会创建新的线程来处理新任务,否则就会利用空闲的线程来处理新任务。
实例
public class ThreadPoolDemo {
public static void main(String[] args) {
MRunable mRunable=new MRunable();
ExecutorService cacheThreadPool=Executors.newCachedThreadPool();
cacheThreadPool.execute(mRunable);
}
static class MRunable implements Runnable {
public void run() {
System.out.println("开始执行任务");
try {
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println("暂停执行2秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务结束");
}
}
}
源码分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThradPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,这意味着CachedThradPool没有核心线程,非核心线程是无界的。keepAliveTime设置为60L,则空闲线程等待新任务的最长时间为60s,超过60s的闲置线程就会被回收。阻塞队列SynchronousQueue是一个不存储元素的阻塞队列,每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。
CachedThradPool执行示意图
当执行execute 方法时,首先会执行SynchronousQueue 的offer 方法来提交任务,并且查询线程池中是否有空闲的线程执行SynchronousQueue的poll方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行SynchronousQueue的poll方法,等待SynchronousQueue中断提交任务。如果超过60s没有新任务提交到SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize是无界的,所以如果提交的任务大于线程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。所以,CachedThradPool比较适合于大量的需要立即处理并且耗时比较少的任务。
SingleThreadExecutor
通过Executors 的newSingleThreadExecutor 方法来创建。这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。SingleThreadExecutor的意义在于统一所有的外界任务到一个线程中,使得这些任务之间不需要处理线程同步的问题。
实例
public class ThreadPoolDemo {
public static void main(String[] args) {
MRunable mRunable=new MRunable();
ExecutorService singleThreadPool=Executors.newSingleThreadExecutor();
singleThreadPool.execute(mRunable);
}
static class MRunable implements Runnable {
public void run() {
System.out.println("开始执行任务");
try {
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println("暂停执行2秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务结束");
}
}
}
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSIze 和maximumPoolSize 都为1,意味着SingleThreadExecutor 只有一个核心线程。
SingleThreadExecutor执行示意图
当执行execute 方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。
ScheduledThredPool
通过通过Executors 的newScheduledThredPool 方法来创建。它的核心线程是固定的,而非核心线程是没有限制的,并且当非核心线程闲置时会被立即回收。ScheduledThredPool 这类线程池主要用于执行定时任务和具有周期的重复任务。
实例
public class ThreadPoolDemo {
public static void main(String[] args) {
MRunable mRunable=new MRunable();
ScheduledExecutorService scheduleThreadPool=Executors.newScheduledThreadPool(4);
scheduleThreadPool.scheduleAtFixedRate(mRunable, 0, 3000, TimeUnit.MILLISECONDS);
}
static class MRunable implements Runnable {
public void run() {
System.out.println("开始执行任务");
try {
TimeUnit.MILLISECONDS.sleep(2000);
System.out.println("暂停执行2秒");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务结束");
}
}
}
源码分析
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor的构造方法最终调用的是ThreadPoolExecutor 的构造方法。corePoolSize是传进来的固定的数值,maximumPolSize 的值是Integer.MAX_VALUE。
ScheduledThredPool执行示意图
当执行ScheduledThreadPoolExecutor的ScheduleAtFixedRate 或者ScheduleWithFixedDelay方法时,会向DelayedWorkQueue添加一个实现RunnableScheduledFuture接口。ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而是去DelayedWorkQueue中取ScheduledFutureTask,然后去执行任务。如果运行的线程到达了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。当执行完任务后,会将ScheduledFutureTask中的time变量改为下次要执行的时间并放回到DelayedWorkQueue中。
|