CompletableFuture用法详解
前言
JDK汇总已经提供了异步编程的类Future<T> ,为什么还要CompletableFuture 这个类呢?
我们通常在使用Future接口异步编程时,获取结果都需要阻塞获取。主线程中会阻塞,其实最好的结果是让它执行完结果后通知我们获取。
还有一种情况,当我们需要多个异步结果嵌套时候,Future这个接口就就不能满足我们的实用场景了,可能需要我们手动在里面进行嵌套。
对此,从JDK 8开始,在Concurrent包中提供了一个强大的异步编程工具 CompletableFuture。在JDK8之前,异步编程可以通过线程池和Future来实现。
CompletableFuture
CompletableFuture接口实现了Future 接口,对其功能进行了扩展,使得其功能更加强大,可以事件通知,嵌套等操作。 data:image/s3,"s3://crabby-images/850bb/850bbaf7e74fd48fc43d467141c58fff243d81f1" alt="在这里插入图片描述" CompletionStage接口提供了一些方法
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,
Executor executor);
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action) {
return biAcceptStage(null, other, action);
}
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
等等这里列举了一部分接口
这里介绍几个最常用的接口
runAsync 与 supplyAsync
可以指定线程池,默认是 data:image/s3,"s3://crabby-images/000d4/000d401ec351db9e4123ec6255a7fa5780e222bc" alt="在这里插入图片描述" 构造 data:image/s3,"s3://crabby-images/a4d87/a4d87dacbf99e0a551b5fe6cf43022820c68f18d" alt="在这里插入图片描述"
有时候我们异步执行完结果后,可以额外通知做一些事情。
whenComplete 表示执行完后通知事件
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(null, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action) {
return uniWhenCompleteStage(asyncPool, action);
}
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor) {
return uniWhenCompleteStage(screenExecutor(executor), action);
}
例如
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture.runAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" =========== do some ting ========== ");
},executor).whenComplete((v,throwable)->{
System.out.println(" 执行结束 通知 .... " + v );
});
执行如下 data:image/s3,"s3://crabby-images/6b34e/6b34eed2714e4621d2ed3894a04a3b9fb6d1511a" alt="在这里插入图片描述"
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" =========== do some ting ========== ");
return "supplyAsync str ";
}, executor).whenComplete((v, throwable) -> {
System.out.println(" 执行结束 通知 .... " + v);
});
执行结果 data:image/s3,"s3://crabby-images/c80c9/c80c9e87af7a97038b428829ac30cbcf17998367" alt="在这里插入图片描述"
thenRun、thenAccept和thenApply
有时候异步执行完任务会,再做一些事情,也可能根据返回值做一些操作。 此时可以用到这几个事件。
对于 Future,在提交任务之后,只能调用 get()等结果返回;但对于 CompletableFuture,可以在 结果上面再加一个callback,当得到结果之后,再接着执行callback。
thenRun
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" =========== do some ting ========== ");
}, executor).thenRun(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thenRun : 任务执行结束之后执行的语句");
}).whenComplete((v, throwable) -> {
System.out.println(" 执行结束 通知 .... " + v);
});
thenAccept
这里用的无返回值, 会是Null, 返回值可以用 supplyAsync 方法
CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" =========== do some ting ========== ");
}, executor).thenAccept(new Consumer<Void>() {
@Override
public void accept(Void unused) {
System.out.println(" thenAccept : 执行任务完后获取前面执行的结果 = " + unused);
}
}).whenComplete((v, throwable) -> {
System.out.println(" 执行结束 通知 .... " + v);
});
thenApply
thenApply 可以链式计算,执行结束可以接着thenApply操作。
CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" =========== do some ting ========== ");
return " supplyAsync : thenApply(function)";
}, executor).thenApply(new Function<String, Object>() {
@Override
public String apply(String unused) {
System.out.println( " 接收上面返回值 接着计算 unused :" + unused);
return unused + " :apply : " + 1234567;
}
}).whenComplete((result, throwable) -> {
System.out.println(" 执行结束 通知 .... " + result);
});
thenCompose 与 thenCombine
thenApply接收的是一个Function,但是这个Function的返回值是一个通常的基 本数据类型或一个对象,而不是另外一个 CompletableFuture。如果 Function 的返回值也是一个 CompletableFuture,就会出现嵌套的CompletableFuture。
没有 thenCompose 时候嵌套 任务,用 thenApply来实现 thenApply中有个函数接口,一个是T入参(也就是任务结果) 一个是返回值(最后嵌套里的返回值)。 data:image/s3,"s3://crabby-images/ab155/ab155e3b7b7f7bc32ef815372973cebc2397489b" alt="在这里插入图片描述" 会继续返回我们的 CompletableFuture data:image/s3,"s3://crabby-images/8b96c/8b96c1810905f246fdbd75fb35d6459151f8c5d4" alt="在这里插入图片描述"
CompletableFuture<CompletableFuture<Integer>> completableFutureCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(" =========== do some ting ========== ");
return " 没有 thenCompose 时候嵌套 任务 ";
}, executor).thenApply(new Function<String, CompletableFuture<Integer>>() {
@Override
public CompletableFuture<Integer> apply(String s) {
return CompletableFuture.supplyAsync(() -> {
return s.length();
});
}
}).whenComplete((result, throwable) -> {
System.out.println(" 最后通知返回," + result);
System.out.println(" 最后通知返回异常 ," + throwable);
});
CompletableFuture<Integer> integerCompletableFuture = completableFutureCompletableFuture.get();
System.out.println("嵌套返回: " + integerCompletableFuture.get());
可以看到结果,需要获取2次 data:image/s3,"s3://crabby-images/81038/81038effc5e5a7ff654b81acccdec2b831cd1d73" alt="在这里插入图片描述"
thenCombine
可以通过 thenCombine 简化上面的嵌套任务,返回值是一个非嵌套的CompletableFuture
CompletableFuture<Integer> integerCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println(" =========== do some ting ========== ");
return " thenCombine(简化嵌套任务) ";
}, executor).thenCompose(new Function<String, CompletionStage<Integer>>() {
@Override
public CompletionStage<Integer> apply(String s) {
System.out.println(" thenCombine(简化嵌套任务) 获取上一步值: " + s);
return CompletableFuture.supplyAsync(() ->
s.length()
);
}
}).whenComplete((result, throwable) -> {
System.out.println(" 最后通知返回," + result);
System.out.println(" 最后通知返回异常 ," + throwable);
});
System.out.println(" get : " + integerCompletableFuture1.get());
这里结果我们只需要get一次 data:image/s3,"s3://crabby-images/4792d/4792d4260cac28414d3a35719cba233ac6d2cec8" alt="在这里插入图片描述"
thenCompose
thenCompose
第1个参数是一个CompletableFuture类型,第2个参数是一个方法,并且是一个BiFunction,也就 是该方法有2个输入参数,1个返回值。 从该接口的定义可以大致推测,它是要在2个 CompletableFuture 完成之后,把2个 CompletableFuture的返回值传进去,再额外做一些事情。
data:image/s3,"s3://crabby-images/5b569/5b5692f6ab326dd770dc255e66b6a768f871b8e4" alt="在这里插入图片描述"
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(" thenCompose do something 1 ");
return "1---> result";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println(" thenCompose do something 1 ");
return "2---> result";
}), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) {
System.out.println(" thenCompose 里处理任务 ");
return s + " | " + s2;
}
}).whenComplete((result, t) -> {
System.out.println("最后结果: " + result);
System.out.println("执行异常: " + t);
});
System.out.println(stringCompletableFuture.get());
可以看到最后我们将两个任务的结果拼起来 data:image/s3,"s3://crabby-images/54b79/54b79b1ed2dd5bb26427a3dcf5ebaf799ce9f275" alt="在这里插入图片描述"
allOf 与 anyOf
批量提交任务
//返回值不确定 所以都以void返回
CompletableFuture.allOf();
data:image/s3,"s3://crabby-images/6e82d/6e82dbf236d3cd180966d74b951636d3d994b0b2" alt="在这里插入图片描述"
//提交的任务都有返回值 所以都以Object 返回
CompletableFuture.anyOf();
data:image/s3,"s3://crabby-images/958eb/958eb193387bd76cd96da60f66f80563ff74e622" alt="在这里插入图片描述" allOf的返回值是CompletableFuture类型,这是因为每个传入的CompletableFuture的返回 值都可能不同,所以组合的结果是无法用某种类型来表示的,索性返回Void类型。
anyOf 的含义是只要有任意一个 CompletableFuture 结束,就可以做接下来的事情,而无须像 AllOf那样,等待所有的CompletableFuture结束。
最后
提交给CompletableFuture执行的任务有四种类型:Runnable、Consumer、Supplier、Function。 data:image/s3,"s3://crabby-images/b6aa1/b6aa1f2c069bead8b5eaa51cd58fe6ca04eee2f5" alt="在这里插入图片描述" runAsync 与 supplierAsync 是 CompletableFuture 的静态方法;而 thenAccept、thenAsync、thenApply是CompletableFutre的成员方法。 初始的时候没有CompletableFuture对象,也没有参数可传,所以提交的只能是Runnable或者Supplier,只能是静态方法;
通过静态方法生成CompletableFuture对象之后,便可以链式地提交其他任务了,这个时候就可以提交Runnable、Consumer、Function,且都是成员方法。
|