既然要说异步编程,肯定首先得知道啥是异步、啥是同步,我在文章同步和异步、阻塞和非阻塞的区别中已经讲过了,此处就不再赘述了。
一、FutureTask
1、FutureTask的通俗解释
有一天你饿了,想吃饭,但是你不会做,这时候你就跟你妈说:妈,我饿了,我要吃饭。然后你妈就去厨房做饭,同时你就去看球赛了(但是你不想被打扰,就跟你妈说,饭做好了贴个墙纸就好了)。你妈把饭做好并端到餐桌上了,并贴了个墙纸,也不告诉你,然后你饿的受不了了,看了一下墙,已经有贴纸了,你就去餐桌上吃饭了,如果没有贴纸你就继续看球赛。FutureTask就是这张墙纸。
当然,你要真这么干,估计你妈会打死你。
一句话总结FutureTask:FutureTask也即代表了异步执行的任务
2、FutureTask标准用法
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<?> future = executorService.submit(() -> {
System.out.println("start");
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end");
});
System.out.println(future.isDone());
future.get(2, TimeUnit.SECONDS);
if (!future.isDone()) {
future.cancel(true);
}
}
3、FutureTask源码
public class FutureTask<V> implements RunnableFuture<V> {
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
(1)setException(执行异常)
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
finishCompletion();
}
}
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
done();
callable = null;
}
(2)set(执行成功)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
finishCompletion();
}
}
(3)cancel
FutureTask中还有一个很重要的方法——cancel,用于取消任务执行。
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally {
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
注意:FutureTask如果不是在线程池中运行,那么调用cancel方法可能会导致业务线程在后面执行业务的时候响应了不属于它自己的中断,因为这个中断是被调用线程由于标志FutureTask线程把任务执行完毕,而不是用于中断业务的执行。
(4)handlePossibleCancellationInterrupt
在FutureTask的源码中Run方法里,finally中会调用handlePossibleCancellationInterrupt处理中断
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();
}
|