前言
我们知道线程Runnable接口是无法获取线程执行的返回值的,需要用另一个接口Callable接口是可以获取线程返回值的。
区别:
而thread类的构造方法没有接受类型是Callable参数的,所以需要对Callable进行一些封装才可以进行执行,也就是FutureTask。
1、FutureTask结构
1.1、继承结构
可以发现FutureTask其实是实现了Runnable接口的
1.2、构造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
两个构造器都保证了初始时状态为NEW。除了可以接受Callable 之外,还可以接受Runnable ,但也是马上通过适配器模式把Runnable 包装成一个Callable 而已。
1.3、状态
FutureTask的重点在于对task(Callable.call() )的执行的管理,而FutureTask通过一个volatile的int值来管理task的执行状态。
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
这个state就和AQS的state一样,是最重要的属性,因为state就时刻反映了task的执行状态。
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
状态转移如下:
初始状态是NEW,构造方法可以体现,状态改变是不可逆的,中间状态存在时间很短,通常是通过CAS来改变的,若成功其实直接就改变成最终状态了,只要状态不是NEW的话,就可以认为生产者执行task已经完毕。
-
set(V v)使得NEW -> COMPLETING -> NORMAL。 -
setException(Throwable t)使得NEW -> COMPLETING -> EXCEPTIONAL。 -
cancel(boolean mayInterruptIfRunning)可能有两种状态转移:
1.4、消费者链表
对同一个FutureTask 对象调用get 的不同线程的都属于消费者,当生产者还没有执行完毕task时,调用get 会阻塞。而做法是将消费者线程包装成一个链表节点,放到一个链表中,等到task执行完毕,再唤醒链表中的每个节点的线程。这种做法类似于AQS的条件队列和signalAll 。其实就是多个线程获取了同一个FutureTask .get的执行结果,需要一个阻塞唤醒的机制。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
消费者被包装成了栈,先进后出,可以发现是一个单链表,结构相对来说比较简单。
1.5、成员属性
private volatile int state;
private Callable<V> callable;
private Object outcome;
private volatile Thread runner;
private volatile WaitNode waiters;
-
outcome是Object类型,可以存任何类型对象。这样既可以存泛型类型V,也可以存异常对象。 -
当调用new Thread(FutureTask对象).start()时,生产者线程便创建并开始运行了,并且会在FutureTask#run()的刚开始就把生产者线程存放到runner中。 -
当调用FutureTask对象.get()时,如果task还未执行完毕,当前消费者线程会被包装成一个节点扔到栈中去。
2、实现Runnable接口的run方法
public void run() {
1、先判断状态是否为NEW,并且设置runner属性为当前线程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
2、再次判断状态是否为NEW
if (c != null && state == NEW) {
V result;
2.1、线程执行成功标志
boolean ran;
try {
2.3、到这儿开始执行线程了
result = c.call();
2.4、执行完,设置标志为true
ran = true;
} catch (Throwable ex) {
3、抛出异常,设置返回值,以及状态转变这儿是NEW -> COMPLETING -> EXCEPTIONAL
result = null;
ran = false;
setException(ex);
}
if (ran)
4、成功设置值状态时NEW -> NORMAL
set(result);
}
} finally {
5、最终把持有线程设置为null
runner = null;
int s = state;
5.1、是否中断过,设置state最终状态为INTERRUPTED
消费者线程可能使得task取消,其中一种状态转移是NEW -> INTERRUPTING -> INTERRUPTED
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
if (c != null && state == NEW) 执行有两种结果:
- 顺利执行完task,然后调用
set(result) 。 - 执行task途中抛出异常,然后调用
setException(ex) 。
protected void set(V v) {
1、这就是前面说的中间状态存在时间短暂,只是判断一下cas成功COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
2、具体的返回值,设置到outcome
outcome = v;
3、设置最终状态NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
4、唤醒所有被阻塞的线程
finishCompletion();
}
}
protected void setException(Throwable t) {
1、同理,这儿也是中间状态COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
2、最终状态
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
上面两个函数都调用了finishCompletion:
private void finishCompletion() {
1、从head节点开始遍历,退出内循环时检查waiters是否为null
for (WaitNode q; (q = waiters) != null;) {
2、头节点waiters直接设置为null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
3、自旋唤醒所有的线程,并清除thread缓存
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
3.1、唤醒线程,会在get方法的awaitDone处被唤醒。
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
done();
callable = null;
}
此函数负责唤醒所有消费者线程,原理很简单,内层循环遍历链表的每个节点,唤醒每个节点的线程对象。而外层循环在刚开始时,负责给局部变量q 赋值,在退出外层循环时,负责检查waiters 是否已经被赋值为null(当然检查结果肯定成立)。
最后还会调用done 函数,但这只是空实现,这是用来给使用者拓展用的,可以让生产者线程在执行完毕前多做一点善后工作。
3、Future接口方法(get等)
3.1、get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
1、状态<=COMPLETING,还没执行完,没变成最终状态
if (s <= COMPLETING)
1.1、没执行完,判断是否进入栈进行阻塞
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
1、已经超时了还没开始执行或没执行完,直接报错超时
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
就是判断线程是否为最终状态,然后进行设置返回值。下面来看awaitDone方法。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
1、设置死亡时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
1.1、判断是否入栈的标志
boolean queued = false;
2、又是一个自旋,前面分析过AQS其实这儿就简单很多了
for (;;) {
2.1、检查线程是否被中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
2.2、获取task最新的状态
int s = state;
2.3、s > COMPLETING说明已经变成最终状态,清除q的thread直接返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
2.4、中间状态,存在短暂,所以再自旋一下尝试一下
else if (s == COMPLETING)
Thread.yield();
2.5、这时,说明线程还是NEW状态,还没开始执行或执行中,新建节点
else if (q == null)
q = new WaitNode();
2.6、入栈
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
2.7、超时的get方法走这儿,若没有超时则进行阻塞
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
2.8、直接阻塞,finishCompletion方法会在这儿唤醒
else
LockSupport.park(this);
}
}
来看一下removeWaiter(q) 是怎么移除节点的,注意实参是可能为null的,当q局部变量还没创建,当前线程就被中断时。
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) {
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null)
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
其实就是把node的thread为null的节点移除掉。
- 如果发现一个取消节点是首节点,那么使得head下移一个节点即可。
- 如果发现一个取消节点不是首节点,那么将pred -> q -> s变成pred -> s(执行pred.next = s)。如下图所示,从链表任意节点出发,都不能到达这个
q 节点。
最终进入report方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
3.2、cancel、isCancelled
直到执行set 或setException 之前,都在正常执行task中,而既然没有执行这两个函数,说明这段时间state还是为NEW的。
public boolean cancel(boolean mayInterruptIfRunning) {
1、先判断state状态是否还为NEW,若不为NEW是无法取消的,直接返回
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
2、是否考虑中断线程,也就是设置中断标志,而是否可以检测到还是要看真正的task代码内是否检测了中断状态。
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
2.1、最终状态
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
3、同样唤醒所有的等待线程。
finishCompletion();
}
return true;
}
而cancel函数执行前提就是state是NEW,在生产者线程执行set或setException之前,都是可以CAS成功的。
如果消费者是在生产者线程执行run方法的if (c != null && state == NEW)之前就执行了cancel函数,可以取消task成功。
如果消费者是在生产者线程执行run方法的if (c != null && state == NEW)之后才执行的cancel函数,无法取消task。
-
如果参数是false,state从NEW修改为CANCELLED。但修改state,并不能使得生产者线程运行终止。 -
如果参数是true,state从NEW修改为INTERRUPTING,中断生产者线程后,再修改为INTERRUPTED。我们知道,中断一个正在运行的线程,线程运行状态不会发生变化的,只是会设置一下线程的中断状态。也就是说,这也不能使得生产者线程运行终止。除非生产者线程运行的代码(Callable.call())时刻在检测自己的中断状态。
那你可能会问,这种情况既然不能真的终止生产者线程,那么这个cancel函数有什么用,其实还是有用的:
-
如果参数为true,那么会去中断生产者线程。但生产者线程能否检测到,取决于生产者线程运行的代码(Callable.call())。 -
状态肯定会变成CANCELLED或INTERRUPTED,新来的消费者线程会直接发现,然后在get函数中不去调用awaitDone。 -
对于生产者线程来说,执行task期间不会影响。但最后执行set或setException,会发现这个state,然后不去设置outcome。
最后执行了finishCompletion函数,唤醒所有的消费者线程。
public boolean isCancelled() {
1、state >= CANCELLED说明被中断了,也就是取消成功
return state >= CANCELLED;
}
public boolean isDone() {
2、不为NEW,也就变成了最终状态,执行结束
return state != NEW;
}
这儿就比较简单,就是根据状态来判断的。
4、总结
-
FutureTask整合了Callable对象,使得我们能够异步地获取task执行结果。 -
执行FutureTask.run()的线程就相当于生产者,生产出执行结果给outcome。执行FutureTask.get()的线程就相当于消费者,它们会阻塞等待直到执行结果产生。 -
如果生产者线程已经开始执行Callable.call(),那么消费者调用cancel,实际上是无法终止生产者的运行的。
|