Executor
Executor类的实例对象是一个执行已提交的任务的对象。 该接口提供了一种将任务的提交与每个任务的运行机制分离的方法,包括线程使用细节、调度细节等。
通常将为任务显示地创建线程替换为使用Executor来执行任务。比如,当有多个线程任务时,为每个任务创建一个线程并启动new Thread(new Runnable(){}).start() 的代码,可以替换为:
Executor executor = anExecutor;
executor.execute(new Runnable(){});
executor.execute(new Runnable(){});
但是,Executor并没有严格要求任务的执行必须是异步的。最简单的情况,是Executor可以在应用execute(runnable)方法的线程中立即运行提交的任务:
class DirectExecutor implements Executor{
public void execute(Runnable r){
r.run();
}
}
更为典型的情况则是,Executor会在一些其他线程中运行提交的任务、而非在调用execute?方法的线程中。以下代码示例的是一个为每一个任务创建一个新的线程的执行器:
class ThreadPerTaskExecutor implements Executor{
public void execute(Runnable r){
new Thread(r).start();
}
}
许多Executor的实现类会给任务的执行方式和时间做一些限制和要求。以下代码所实现的一个执行器会将提交上来的所有任务序列化地在提交给另一个Executor实例,说明这是一个复合执行器:
class SerialExecutor implements Executor{
Executor executor;
final Queue<Runanble> tasks = new ArrayDeque<>();
Runnable active;
public SerialExecutor(Executor executor){
this.executor = executor;
}
public synchronized void execute(Runnable r){
tasks.offer(r->{
try{
r.run();
}finally{
scheduleNext();
}
});
if(active == null) { scheduleNext(); }
}
protected synchronized void scheduleNext(){
if((active = tasks.poll())!=null) { executor.execute(active); }
}
}
SerialExecutor的execute方法首先重新封装提交给它的任务r,重新封装出的新任务的run()方法代码逻辑为:
- 首先会执行原来的任务的代码r.run(),
- 然后自动从task队列中poll一个封装后的新任务,并调用另一个executor.execute?方法执行这个新任务
重新封装好提交来的任务后,会检查executor是否正在执行任务,如果没有,就调用scheduleNext()来启动executor执行新任务。
SerialExecutor实现了对提交来的任务的串行化执行。由于scheduleNext()方法是protected的,因此外部方法无法直接调用它来并行地启动另外一个线程。而SerialExecutor对通过execute?方法提交来的任务做了新的封装,使得只有在一个任务运行完毕后才会调用另一个executor的方法运行新的任务。
那么当多次调用SerialExecutor的execute?方法时,会出现什么情况呢?例如在同一个线程中,连续数次调用这个方法,synchronized关键字是可重入锁,execute?方法也会被多次执行,但是其执行的是封装新任务的代码,并非是多次启动另一个executor.execute?方法的代码,启动另一个executor的代码只有在active== null的时候才会执行,那么还是能确保任务的串行化执行。
另外,tasks则相当于用一个queue对任务做了一个缓存,由于防止另外的executor来不及处理提交的任务。并使用synchronized以防多个线程同时访问queue时可能出现数据错误。
同一个package内的另一个接口ExecutorService,继承了Executor接口,更广为使用。 ThreadPoolExecutor类则是一个可供应用程序开发人员扩展的Executor实现类。 Executors类则为以上这些实现类提供了遍历的工厂方法。
Memory consistency effects(内存一致性影响): Actions in a thread prior to submitting a {@code Runnable} object to an {@code Executor} happen-before its execution begins, perhaps in another thread.
public interface Executor{
void execute(Runnable r);
}
ExecutorService
执行组件服务:Executor的实现类,提供了管理终止的方法和为异步任务生成能跟踪运行过程的Future的方法。 ExecutorService能提供了关闭接口,对executorService对象应用了关闭方法后,executorService就会拒绝接受新任务。 当executorService终止后,没有正在运行的任务,没有等待执行的任务,新任务也不能提交了。应当对终止后的executorService应用shutdown方法,以允许回收其资源。
submit方法扩展了Executor.execute方法,它会创建并返回一个Future,应用这个Future可撤销任务或等待任务执行完成。
invokeAny & invokeAll 方法是最常用的执行批量任务的方法,会执行一个集合中的任务,并等待至少一个或者所有任务的完成。
应用程序开发者可以扩展ExecutorCompleionService类,以自定义以上方法。
类Executors提供了创建执行组件的工厂方法。
以下代码展示了一个使用线程池中的线程为网络请求提供服务的简单网络服务器。它使用已经设定好了各种配置的Executors.newFixedThreadPool工厂方法:
public class NetworkService implements Runnable{
private final ExecutorService pool;
private final ServerSocket serverSocket;
public NetworkServece(int port, int poolSize){
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run(){
try{
for(;;){
pool.submit(new Handler(serverSocket.accept()));
}
}catch(Exception e ){
pool.shutdown();
}
}
}
class Handler implements Runnable{
final Socket socket;
public Handler(Socket socket){
this.socket = socket;
}
public void run(){
}
}
以下代码对一个ExecutorService对象实施两阶段的关闭,以确保能撤销时间较长的任务。先对executorService对象应用shutdown方法,拒绝接受新任务;再对其应用shutdowNow方法,以撤销时间过长的任务。
void shutDownAndAwaitTermination(ExecutorService pool){
pool.shutdown();
try{
if(!pool.awaitTermination(60, TimeUnit.SECONDS){
pool.shutdownNow();
if(!pool.awaitTermination(60, TimeUnit.SECONDS){
System.err.println("pool did not terminate");
}
}
}catch(InterruptedException e){
pool.shutdownNow();
Thread.currentThread().interrupt();
}finally{}
}
内存一致性效果:向ExecutorService提交一个Runnable任务或者Callable任务的动作一定发生在ExecutorService运行这个任务的动作之前,而运行这个任务的动作一定又发生在通过Future.get()方法得到这个任务的运行结果之前。
public interface ExecutorService extends Executor{
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable r, T result);
Future<?> submit(Runnable r);
<T> List<Future<T>> invokeAll(Collection<? extends Callable> tasks);
<T> List<Future<T>> invokeAll(Colleciton<? extends Callable> tasks, long timeout, TimeUnit unit);
<T> T invokeAny(Collection<? extends Callable> tasks) throw ExecutionException;
}
Callable
用于创建有返回值并且可以抛出受检异常的任务。 与Runnable接口相同的是:两者都是为了创建需要运行在另外的线程中的任务而设计。但Runnable接口不能返回结果,且不能抛出受检异常。 Executors类提供了将其它形式的任务转换为Callable接口的实例的工具方法。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Executors中提供的将Runnable实例转换为Callable实例的工具:
public class Executors{
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
}
Future
一个Future实例代表一个异步计算的执行结果。 提供了阻塞等待异步计算执行完成后获取结果(get)、撤销计算任务(cancel)、检查异步计算是否已经完成(isDone)、检查计算是否被取消(isCancelled)的接口方法。 如果计算已经执行完成,则无法被取消cancel。
public interface Future<T>{
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture
一个RunnableFuture实例是一个可以代表一个Runnable实例的执行结果的Future对象。 当Runnable的run方法被执行完毕后,Future对象就完成了,并可以访问它得到运行结果。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask
一个可取消的异步计算,FutureTask类提供了Future接口的基本实现,实现了启动和取消计算、查看计算是否已经完成、获取计算结果的接口方法 只有计算执行完毕后,计算结果才能被获取到。如果计算还未完成、get方法将会阻塞当前线程。 计算一旦被执行完成,计算任务不能再被重新启动或者取消、除非应用runAndReset方法。
FutureTask可以用于封装Runnable对象或者Callable对象,因为FutureTask实现了Runnable接口,因此可以提交到executor执行。
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {... }
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call();
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
在FutrueTask类中,间接继承自Future接口的方法(get、cancell、isDone、isCancelled),是客户端方法,被客户端线程调用。 而间接继承自Runnable接口的方法(run),则是会被任务真正的执行线程所调用的方法,比如线程池中的某个工作线程。
那么FutureTask类是如何协调这两种线程呢?对于客户端线程,大致流程如下:
- 首先:某客户端线程,把真正要执行的任务也即一个Runable实例r ,构造成一个FutureTask(Runnable r, V v)实例对象
- 接着:某客户端线程,将这个FutureTask实例提交到一个线程池;或者客户端线程直接创建一个新的线程,来执行这个实现了Ruannble接口的FutureTask实例:
new Thread(futureTask).start(); 。 - 当客户端线程将FutureTask实例提交给了另外的线程或者线程池后,新线程或者线程池会在其后的某个时间点对提交来的FutureTask实例应用其run()方法,但是具体何时执行无法预测。
- 接着:创建并提交了任务的客户端线程,通常会在其后的逻辑中,应用FutureTask实例的客户端方法(get \ cancell \ isDone \ isCancelled) 来等待任务执行完毕、或者获取 run()的结果、或者取消任务。
- 当客户端线程应用了FutureTask实例的get()方法
futureTask.get() ,get()方法就会将客户端线程加入等待队列 waiters(),并使其阻塞,也就是说客户端线程会被阻塞在futureTask对象上:LockSupport.park(this, nanos); 。直到真正执行任务的线程应用r.run() 并执行完毕后,会唤醒这些等待线程(LockSupport.unpark(t) );或者直到客户端线程被中断 t.interrupt() ;或者其它客户端线程对任务应用了cancel()方法 - 当客户端线程应用了FutureTask实例的cancel方法
futureTask.cancel() ,客户端线程会对真正执行任务的线程runner应用runner.interrupt()方法,中断任务的执行,然后会唤醒waiters等待队列中的所有阻塞在get()方法上的客户端线程。
对于真正执行任务的线程:
- 任务的真正执行线程,会应用futureTask的run方法。
- 执行线程在执行futureTask的run方法时,首先会设置对象的属性runner为当前执行线程:
UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()) - 接着:调用构造FutureTask实例时的Runnable r的run方法,执行真正的任务逻辑。
- 最后:
LockSupport.unpark(t) 唤醒waiters队列中的所有阻塞在get()方法上的线程。无论r.run是正常执行完毕还是异常结束。
以上内容简单表述了提交任务的客户端线程与任务的执行线程是如何协调的,其中应用了volatile + UNSAFE.compareAndSwapXXX、LockSupport.parkNanos(this, nanos)、LockSupport.park(this)、LockSupport.unpark()、t.interrupt()、Thread.yeild()、等JDK原生线程间协调的API。有如下关键点:
volatile Thread runner ; // 任务的真正执行线程,用于防止出现多个执行线程同时执行这个任务的情况,当多个执行线程同时进入futureTask.run()方法后,会执行if(state!=NEW || !UNSAFE.casObjct(this, runnerOffset, null, Thread.current())) return false; ,即,如果runer不为null,说明已经有线程在执行这个任务,当前执行线程应退出。volatile int state; // 任务的状态,也是同UNSAFE.casInt来控制。 volatile WaitNode waiters; // 调用get()方法后的客户端线程t,如果任务还没被执行完毕,则客户端线程会被构造成一个WaitNode节点q,q.thread=t;,然后头插法插入waiters链表:s = waiters; UNSAFE.casObject(this, waitersOffset, q.next = s, q );并被阻塞
FutureTask实现了RunnableFuture,则其本身就是一个Runnable,可以通过Executor.execute?方法提交并执行。 这个Runnable的实现:run() { ... result = callable.call(); ... } FutureTask类是一个Runnable,肯定是要执行一个任务的,这个任务用成员变量 Callable<V\> callable; 表示; FutureTask类需要提供监控任务的状态的功能,这个功能用成员变量volatile int state; 来承载。 FutureTask提供的阻塞等待任务运行、以待结束后取任务结果的功能,是通过一个单链表volatile WaitNode waiters; 来实现的,当某线程调用get()方法后,就会将自身cas头插法加入waiters队列,然后调用 LockSupport.park()方法阻塞等待,直到run | cancel 这两个的方法执行过程中,调用LockSupport.unpark(t)唤醒waiters中所有阻塞等待的线程。 FutureTask类中定义了成员变量Object outcome ,用于存储计算结果、或者run方法执行过程中抛出的异常。 还定义了成员变量volatile Thread runner ,用于记录执行这个任务的线程。
run()方法是在任务被Executor执行时调用的:
- 首先,任务必须是处于state=NEW的状态,其它状态下说明任务已经开始执行或者执行结束了。
- 接着,调用
casRunner(this,runnerOffset, null, Thread.currentThread()) ,设置任务的执行线程runner为当前线程。因为有可能多个线程竞争执行同一个任务,因此必须用cas+volatile来更新 - 接着,调用
V v = callable.call(); ,执行任务真正的计算逻辑 - 如果call()方法成功执行完毕,则调用set()方法,先
casState(this, stateOffset, NEW, COMPLETING); 再outcome=v; ,最后putOrderedInt(this, stateOffset, NORMAL) ,最后遍历waiters,将所有阻塞等待获取任务运行结果的线程唤醒。 - 如果call()方法抛出了异常exception,则调用setException(e)方法,先
casState(this, stateOffset, NEW, COMPLETING ) ,再outcom=e. ,最后putOrderedInt(this, stateOffset, EXCEPTIONTAL) ,最后调用finishCompletion(),将waiters中所有等待获取任务结果的线程唤醒。
cancel( mayInterruptIfRunning )方法则是会根据业务逻辑被其它线程主动调用:
- 首先,任务状态必须处于state=NEW的状态;
- 接着,调用
casState( this, stateOffset, NEW, mayInterruptIfRunning? INTERRUPTING : CANCELLED ) - 接着,
if(mayInterruptIfRunning ) { try { runner.interrupt(); } finally { putOrderedInt(this, stateOffset, INTERRUPTED); } } ,将执行线程runner的中断状态置位。无论这个置位是否成功,都要将状态更新为INTERRPTED; - 最后,调用finishComplete()方法,唤醒waiters链表中所有等待获取执行结果的线程。
可以看出,run()方法执行过程中,state有可能出现两种路径的转变:
- 如果callable.call()成功执行完成,则调用set(v)设置state为: NEW -> COMPLETING -> NORMAL
- 如果callable.call()执行过程中抛出异常,则调用setException(e)设置state为: NEW -> COMPLETING -> EXCEPTIONAL
而cancel(mayInterrupted)的执行,state也会出现两种路径的转变: - 如果mayInterrupted==true,则state为: NEW -> INTERRUPTING -> INTERRUPTED
- 如果mayInterrupted==false,则state为: NEW -> CANCELLED
无论run()调用set(v)还是setException(e),这两个方法最后都会调用finishCompletion()唤醒waiters中所有park在这个任务上的线程。cancel()方法也会调用finishCompletion()方法。
get()方法,则是会调用await()方法,在这个方法中线程会将自己加入waiters链表,然后park(this)阻塞等待,直到run()或者cancel()方法执行后,唤醒这些等待线程。
isDone()方法:只要state!=NEW,就返回true。因为非NEW状态,要么是call方法已经执行完(不会再阻塞了),要么是cancel()了。
|