Future 用法详解
前言
为什么出现Future机制
常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。
这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。
从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。
因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果。
Future 用法详解
用法很简单
System.out.println(" main start ");
FutureTask<Integer> integerFutureTask = new FutureTask<>(new TestA());
new Thread(integerFutureTask).start();
System.out.println(" integerFutureTask ...");
Integer integer = integerFutureTask.get();
System.out.println(integer);
System.out.println(" main end ");
class TestA implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println(" call start ");
Thread.sleep(10000);
System.out.println(" call end ");
return 1;
}
}
get时候会一直阻塞获取
当然也可以设置超时时间
public V get(long timeout, TimeUnit unit)
Future 简单原理
使用起来很简单,具体原理我们来研究一下
我们都知道Thread只能运行Runable接口
那返回值是怎么返回的呢?
FutureTask<Integer> integerFutureTask = new FutureTask<>(new TestA()); 此行代码可以看到FutureTask实现了 RunnableFuture接口
继续看RunnableFuture接口继承了我们的Runnable接口 还继承了一个 Future接口 Future接口 定义了返回值的一些方法
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
原理流程如下
第一步
初始化任务,设置线程状态
我们可以看到线程几个状态以及从开始到结束的一些状态
outcome 是输出的返回值 runner 表示正在运行的线程 waiters 表示正在等待的线程
第二步
run方法运行
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
设置结果将线程状态设置,同时设置值 outcome
第三部 get 获取结果
可以看到当状态没有大于等于 COMPLETING 会阻塞
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
返回值 强转类型
线程池返回原理
提交有参数返回任务
提交代码
也是交给 FutureTask 去执行
自定义返回任务值
可以参考这个实现自己的提交
获取数据接口
public interface Future<T> {
T get() throws InterruptedException;
}
做事情接口
public interface FutureTask<T> {
T call();
}
异步提交处理任务
public class FutureTaskService {
public <T> Future<T> submit(final FutureTask<T> futureTask){
AysFutureTask<T> aysFutureTask = new AysFutureTask();
new Thread(()->{
T call = futureTask.call();
aysFutureTask.done(call);
}).start();
return aysFutureTask;
}
}
测试
public class MainTest {
public static void main(String[] args) throws InterruptedException {
FutureTaskService futureTaskService = new FutureTaskService();
Future<String> submit = futureTaskService.submit(() -> {
return doThing();
});
System.out.println(" -------- 返回 ------- ");
System.out.println(" --------- 做其他事情 ----- ");
System.out.println(" --------- do other ----- ");
System.out.println(submit.get());
}
private static String doThing(){
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return " 做事情...";
}
}
测试结果
可以看到我们先提交的任务,前面其他事情处理完后之后再获取结果,这期间这个任务就在执行,同一时间执行多个任务,在最后的时刻阻塞获取结果。
最后
FutureTask是Future的具体实现。
FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Future 和 Runnable 接口。
Thread 可以提交 FutureTask 实际是执行 call方法 然后使用cas比较线程状态等待获取结果
Future继承图
当然 Future 还有其他扩展用法,如 CompletableFuture等
|