了解CompletableFuture
为什么要引入CompletableFuture
为了获得最佳性能表现,可能就需要仔细规划业务流程中的各个步骤的编排。而Java8就提供了这样一个即用容器来连接一系列任务。
CompletableFuture 如何来构建一个任务链呢?这里就不先介绍API了,之前看应用场景。
目前有一个需求:
? 我们需要从 数据中台中获取 安徽、江苏、浙江三省内所有企业信息 及其 子公司 的信息,并按 **子公司数量 **进行排序。
我从观察到的代码(finchinaApp)中来看,如果遇到这种类似的需求,基本会采用多线程池 并发的查询安徽、江苏、浙江三省的公司,然后线程池内线程都结束执行,再对所有的结果进行排序。
当然在这种场景下,使用FutureTask 或者 线程池 +
CountDownLatch countDownLatch = new CountDownLatch(3);
new Thread(() - >{
countDownLatch.countDown();
}).start();
new Thread(() - >{
countDownLatch.countDown();
}).start();
new Thread(() - >{
countDownLatch.countDown();
}).start();
try {
countDownLatch.await();
} catch (InterruptedException e) {
}
MultiTaskScheduler multiTaskScheduler = new MultiTaskScheduler();
multiTaskScheduler.add(()->{
})
multiTaskScheduler.add(()->{
})
multiTaskScheduler.add(()->{
})
try {
multiTaskScheduler.start(true, 10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
multiTaskScheduler.shutdown();
}
可以观察到,该使用场景下,不需要引入CompletableFuture。如果是以下场景呢?
- 使用多线程 从 redis中获取 全国各个省份内所有公司的信息 ,如果reids中缺少某个省份的信息,则选择从数据库中获取,并将获取的数据异步更新到redis中。在以往我们就需要通过调用线程池submit方法将Future返回的结果 存放在一个List中,然后通过for循环get()得到结果,然后通过结果的不同再进行不同的处理。
- 校验省内中所有公司的税务报表,如果某个公司校验过程中出现异常,则记录日志,并且中止整个省份所有公司的税务校验。
ExecutorService threadPool = Executors.newFixedThreadPool(10);
List<Future<Boolean>> result = new ArrayList<>();
result.add(threadPool.submit(() -> {
System.out.println("执行f1");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
}));
result.add(threadPool.submit(() -> {
System.out.println("执行f2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}));
for(int i=0;i<result.size();i++){
System.out.println(result.get(i).get());
}
简单说明:
1. 如果业务中每个线程需要线程结束后立即进行处理,而非等待其他线程都结束再进行操作,则需要使用CompletableFuture。
2. 如果 业务1 和 业务2 并行执行,任意一个任务失败/成功,则标志整个业务失败/成功,则需要使用CompletableFuture
3. 如果 每个线程中有较为复杂的异步任务编排(组合多个Future并行执行结果),则建议使用CompletableFuture,因为可以简化代码,同时极其优雅。
CompletableFuture的优点就在于 :
? ①有回调函数,可以在执行完成后自动调用回调处理逻辑;
? ②其提供的四十多个API可以优雅的编排任务链(提供异步任务完成后的链式调用);
? ③具备异常处理机制
CompletableFuture的使用
使用误区1:CompletableFuture所有的方法都有额外以Async结尾的方法。此类方法的作用在于不是说 后续任务可以在前序任务还没执行完就可以运行。举例:
- thenRun(Runnable action) 当上一个任务结束后,此任务沿用上一个任务的线程池。
- thenRunAsync(Runnable action) 当上一个任务结束后,此任务使用默认的ForkJoinPool线程池
- thenRunAsync(Runnable action,Executor executor),当上一个任务结束后,此任务使用自定义线程池(推荐)
/**
建议用自定义线程池。CompletableFuture在未指定线程池的情况下,默认使用 ForkJoinPool
而ForkJoinPool中线程是守护线程
*/
ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture cf1 = new CompletableFuture().supplyAsync(() -> {
//执行业务1
return c;
},threadPool).thenApplyAsync(c -> {
//执行业务2
return c;
},threadPool).thenAcceptAsync(c ->{
c.stream().sorted(Comparator.comparing(Company::getSubsidiaries));
});
|