前言
我们在精通多线程,没用过CompletableFuture(二)中介绍了创建CompletableFuture的四种方式,我们知道了如何通过CompletableFuture来处理异步任务,之前我们对比CompletableFuture和Future说到,CompletableFuture可以通过异步回调来通知异步任务执行完成,这里我们就一起看看CompletableFuture是如何做到这点的。
ThenApply/ThenApplyAsync
对于ThenApply/ThenApplyAsync,讲太多,不如几行代码来的快,我们先上Demo来演示
@Test
public void testThenApply() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
return "爱琴孩";
}, executorService).thenApply((result) -> {
System.out.println("异步任务执行完成,获取到结果" + result);
System.out.println("当前线程" + Thread.currentThread().getName());
return "Hello " + result;
});
}
执行结果如下
开启异步pool-1-thread-1
异步任务执行完成,获取到结果爱琴孩
当前线程main
上面可以看到,我们可以直接在CompletableFuture对象上直接调用thenApply()来处理?CompletableFuture任务的返回结果,注意观察执行结果,thenApply调用线程为当前线程,如果我们想用新的线程来处理异步任务返回的结果,ThenApplyAsync就ok了。
@Test
public void testThenApplyAsync() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
return "爱琴孩";
}, executorService).thenApplyAsync((result) -> {
System.out.println("异步任务执行完成,开启新线程,对结果进行后续处理" + result);
System.out.println("当前线程" + Thread.currentThread().getName());
return "Hello " + result;
});
}
执行结果如下
开启异步pool-1-thread-1
异步任务执行完成,开启新线程,对结果进行后续处理爱琴孩
当前线程ForkJoinPool.commonPool-worker-1
可以看到,已经使用ForkJoinPool.commonPool中的线程来处理后续结果了,当然,如果想使用自定义线程池中线程也是ok的,直接使用thenApplyAsync的重载方法,这里就不演示了。
ThenAccept/TheRun
同样先上demo
@Test
public void testThenAccept() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
return "爱琴孩";
}, executorService).thenAccept((result) -> {
System.out.println("异步任务执行完成,对结果进行后续处理" + result);
});
}
可以看到ThenAccept,是获取异步任务的返回值,进行消费处理,但是自身没有返回值
@Test
public void testThenRun() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
return "爱琴孩";
}, executorService).thenRun(() -> System.out.println("异步任务执行完成,进行后续处理"));
}
thenRun既不需要入参,也不需要返回值。
Exceptionally
CompletableFuture对于异步任务中的异常处理也比较方便
@Test
public void testExceptionally() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
int i = 1 / 0;
return "爱琴孩";
}, executorService);
completableFuture.thenApply((result) -> {
System.out.println("异步执行结果如下"+result);
return result;
});
completableFuture.exceptionally((e) -> {
e.printStackTrace();
return null;
});
}
执行结果如下
开启异步pool-1-thread-1
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArithmeticException: / by zero
at com.example.study.FeatureTest.lambda$testExceptionally$16(FeatureTest.java:172)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
WhenComplete
上面的exceptionally方法是专门处理异常信息的,而whenComplete是既可以接收正常处理结果,也可以处理异常信息
@Test
public void testWhenComplete() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
int i = 1 / 0;
return "爱琴孩";
}, executorService).whenComplete((a, exception) -> {
if (exception != null) {
System.out.println("异步任务处理异常");
exception.printStackTrace();
} else if (a != null) {
System.out.println("异步任务处理成功");
}
});
}
可以看到whenComplete的第一个参数是异步任务正常处理结果,第二个参数是任务异常处理结果,我们可以在逻辑代码中进行判断处理。
Handle
Handle跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了
@Test
public void testHandle() throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
System.out.println("开启异步" + Thread.currentThread().getName());
int i = 1 / 0;
return "爱琴孩";
}, executorService);
CompletableFuture<String> completableFuture2 = completableFuture1.handle((a, b) -> {
if (b != null) {
System.out.println("异步任务处理异常");
b.printStackTrace();
return "error";
} else if (a != null) {
System.out.println("异步任务处理成功");
return "success";
}
return null;
});
String result=completableFuture2.get();
System.out.println(result);
}
执行结果如下
异步任务处理异常
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArithmeticException: / by zero
at com.example.study.FeatureTest.lambda$testHandle$21(FeatureTest.java:209)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
error
|