Future、FutureTask、CompletableFuture简介
历史背景
-
Future、FutureTask在java1.5版本提出,用于实现异步计算; -
ListenableFuture由Google Guava工具包提供的Future扩展类,便于异步计算; -
紧随ListenableFuture,在JDK1.8提出CompletableFuture,是对Future的拓展,便于异步计算;
Future
Future是一个接口,方法如下:
- get方法,用于获得异步调用返回值
- isCancelled方法,用于检查执行是否被取消
- isDone方法,用于判断是否执行是否完成
- cancel方法,用于取消执行
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
Future<String> submit = threadPoolExecutor.submit(() -> {
System.out.println("123");
return "1";
});
submit.get();
FutureTask
实现了RunnableFuture接口(RunnableFuture接口继承Runnable和Future接口),拥有两个构造方法如下;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
因为继承于Runnable所以可以直接交给线程池执行;如果使用FutureTask(Runnable runnable, V result) 构造方法可以自己指定返回值
FutureTask<String> hello = new FutureTask<>(() -> {
System.out.println("hello");
}, "123");
threadPoolExecutor.execute(hello);
assertEquals("123",hello.get());
FutureTask小结
FutureTask能很好的应用于有返回的异步调用;但是如果出现如下需求时则显得捉襟见肘:
- 无法手动完成:当调用远程服务时,如果发现远程服务出现问题,你需要将最近一次正常结果返回;这时使用Future就无法满足该需求。
- 无法添加回调方法:当调用远程服务结束后需要调用其它方法时,如果使用Future,则需要不断循环调用isDone方法判断是否完成;然后调用get获得结果,接着调用其它方法。
- 无法将多任务合并获得结果:当需要并行调用多个远程服务时,在获得返回结果时需要不断循环调用各future的isDone方法。
- 没有异常处理:Future API没有提供异常处理方法。
CompletableFuture
CompletableFuture是在Java1.8提出,实现了Future接口和CompletionStage接口。其中CompletionStage中包含多种处理方法用于异步计算、异常处理和计算结合等等,接下来将使用CompletableFuture解决上述问题。
问题一:无法手动完成
CompletableFuture<String> completableFuture = new CompletableFuture();
completableFuture.complete("ok");
assertEquals(completableFuture.get(), "ok");
CompletableFuture<String> completableFuture1 = new CompletableFuture();
new Thread(() -> {
completableFuture1.complete("ok");
}).start();
assertEquals(completableFuture1.get(), "ok");
completableFuture可调用complete方法手动完成,否则get方法将阻塞直到任务完成(计算完成或调用了completableFuture.complete("ok"); )
小疑问:如果completableFuture调用了complete方法,原有线程还会继续执行吗?执行完成后的结果会修改complete设置的结果吗?
只要线程没结束,会继续执行!执行完毕后不会影响原有结果;
举例如下:
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("运行了异步运行方法" + Thread.currentThread().getName());
return "1231";
}
});
Thread.sleep(200L);
System.out.println("complete 之前");
future.complete("complete");
System.out.println("complete 之后");
assertEquals(future.get(), "complete");
Thread.sleep(4000L);
assertEquals(future.get(), "complete");
System.out.println("等待sout完成后,再次调用get");
运行结果:
complete 之前
complete 之后
运行了异步运行方法ForkJoinPool.commonPool-worker-1
等待sout完成后,再次调用get
在complete之后 日志打印以后,还打印了异步线程的名称,这证明主线程调用sout.complete(“123”)方法后异步线程还在继续运行。但是如果去掉睡眠200毫秒,则会出现supplyAsync异步方法不会运行。
问题二:无法添加回调方法
CompletableFuture的thenApply方法可为异步方法添加一个回调方法,在CompletableFuture.supplyAsync 中的异步方法执行完毕后,会立即执行thenApply中的同步函数。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "123";
}).thenApply((string) -> {
System.out.println(Thread.currentThread().getName());
return string + "123";
});
System.out.println("start get");
System.out.println(future.get());
注意:thenApply中的代码正常情况会被主线程执行,如果异步方法执行时间较长 ,thenApply中的方法会由CompletableFuture线程执行!
问题三:无法将多任务合并获得结果
CompletableFuture的静态方法allOf可以监控多个任务,使用方法如下:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "234");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "345");
CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future, future1,future2);
completableFuture.get();
如果只需要任意一个任务结束就返回?
CompletableFuture的静态方法anyO可以监控任意一个CompletableFuture完成,并立即执行后续方法,代码如下:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "234");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "345");
CompletableFuture<Object> objectCompletableFuture = CompletableFuture.anyOf(future, future1, future2);
completableFuture.get();
当任何一个CompletableFuture完成后,completableFuture.get() ,都会立刻返回结果,否则主线程会一直阻塞
问题四:没有异常处理
CompletableFuture处理异常可采用handle方法来处理,该方法无论是否异常都会调用,使用方法如下:
CompletableFuture<String> handle = CompletableFuture.runAsync(() -> {
System.out.println("123");
}).handle(new BiFunction<Void, Throwable, String>() {
@Override
public String apply(Void unused, Throwable throwable) {
if (null == throwable) {
return "ok";
} else {
return "error";
}
}
});
assertEquals("ok",handle.get());
如果只需要在发生异常时调用?
可采用exceptionally方法来处理,使用方法如下:
CompletableFuture<String> exceptionally2 = CompletableFuture.supplyAsync(() -> {
int i = 1 / 0;
return "123";
}).exceptionally(new Function<Throwable, String>() {
@Override
public String apply(Throwable throwable) {
return "error";
}
});
assertEquals(exceptionally2.get(), "error");
其他功能点
runAsync方法和supplyAsync方法有何区别?
runAsync是不带返回值的异步方法;supplyAsync是带返回值的异步方法。
异步调用时,可以指定线程池吗?默认使用的什么线程?
默认使用的```ForkJoinPool.commonPool()``线程池;也可以在supplyAsync或runAsync中指定线程池。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
如果需要等待两个任务完成,然后运行其它方法,如何处理?
首先可以使用allof方法,此外CompletableFuture还提供了thenCombine方法,该方法可以等待两个异步方法结束后立刻运行传入的函数,具体使用方法如下:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123");
CompletableFuture<String> other = CompletableFuture.supplyAsync(() -> "456");
CompletableFuture<String> future = future1.thenCombine(other, (a, b) -> {
return a + b;
});
assertEquals(future.get(), "123456");
如果回调方法不需要返回值,使用哪个方法?
thenAccept()可接受一个入参,没有返回值;thenRun()不接受入参,也没有返回值;示例代码如下:
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
return "123";
}).thenRun(new Runnable() {
@Override
public void run() {
}
});
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.supplyAsync(() -> {
return "123";
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
thenApply可以添加回调方法,thenCompose也可以添加回调方法,他们有什么区别?
具体详见代码如下,thenCompose接受的函数的返回值必须是CompletionStage的子类,而thenApply接受的函数的返回值可以是任意类型,但是thenApply会将该返回值再次包装到一个新的CompletableFuture中,所以下面两个代码都返回的CompletableFuture类型的数据。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "123").thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
return CompletableFuture.supplyAsync(() -> {
return "456";
});
}
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "123").thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return "456";
}
});
总结
- CompletableFuture初始化使用supplyAsync、runAsync或其他静态方法
- 如果回调函数不需要返回值且接受传参使用thenAccept方法
- 如果回调函数不需要返回值不接受传参使用thenRun方法
- 如果结合两个有依赖的futures使用thenCompose方法
- 如果结合两个独立的futures使用thenCombine方法
- 如果需要结合多个futures使用allof方法
- 如果回调函数不在主线程运行,则调用各自的 Async方法 例如thenApplyAsync、thenAcceptAsync、thenCombineAsync等
- 使用手动调用complete,CompletableFuture中的异步方法还会运行;且在complete调用后会立即运行该CompletableFuture接下来的操作
参考资料
completablefuture指导
https://www.baeldung.com/java-completablefuture
更加详细的举例
https://www.callicoder.com/java-8-completablefuture-tutorial/
关于listenableFuture的讲解
https://michaelbfullan.com/the-joys-of-guava-listenablefuture/
客官且慢,点赞、收藏+关注 谢谢~
|