一?点睛
Future?接口可以创建出新的线程,并在新线程中执行异步操作。但是,?如果使用?Future?接口创建了多个线程,那么这些线程各自独立执行,不存在任务依赖关系,并且无法控制各个线程的执行步骤。
为了解决这种线程间的依赖关系,从?JDK 1.8?开始,Future?接口提供了一个新的实现类?CompletableFuture。CompletableFuture?不但可以创建出异步执行的线程,还可以控制线程的执行步骤,并且可以监控所有线程的结束时刻。例如,可以使用?CompletableFuture?创建?A?和 B?两个线程,然后规定?A 和?B?线程各需要执行 3个?不同的阶段,并且当?A和?B?全部执行完毕后,再触发某个方法。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor) {
return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor) {
return asyncRunStage(screenExecutor(executor), runnable);
}
}
其中,supplyAsync()?用于有返回值的任务,runAsync()?用于没有返回值的任务,
除了这些用于执行线程任务的方法外,CompletableFuture?还提供了多线程执行时的两种结束逻辑:allOf()?和?anyOf(),如下所述。
1?allOf()?方法会一直阻塞,直到线程池?Executor?中所有线程全部执行完毕。
2?anyOf()?方法会一直阻塞,直到线程池?Executor?中任何一个线程执行完毕。
二?实战
1?需求
有1、2、3、4?四个数字,现要求创建4个线程对这些数字进行处理,并要求每个线程必须按照以下步骤执行。
a?对4个数字的值各加上64,使之转为?A、B、C、D?对应的?ASCII(A:65、B:66、C:67、D:68)。
b?将?ASCII?值转为对应的?A、B、C、D?字符。
c?将转后的?A、B、C、D?加入一个集合中。
最后,还要求当?A、B、C、D 4个线程全部执行完毕后,打印出转换后的结果集。
2?代码
package concurrent;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureDemo {
public static void main(String[] args) {
// 原始数据集
CopyOnWriteArrayList<Integer> taskList = new CopyOnWriteArrayList();
taskList.add(1);
taskList.add(2);
taskList.add(3);
taskList.add(4);
// 结果集
List<Character> resultList = new ArrayList<>();
//线程池,可容纳四个线程
ExecutorService executorService = Executors.newFixedThreadPool(4);
CompletableFuture[] cfs = taskList.stream()
// 第一阶段
.map(integer -> CompletableFuture.supplyAsync(
() -> calcASCII(integer), executorService)
// 第二阶段
.thenApply(i -> {
char c = (char) (i.intValue());
System.out.println("【阶段2】线程"
+ Thread.currentThread().getName() + "执行完毕,"
+ "已将int"
+ i + "转为了字符" + c);
return c;
})
// 第三阶段
.whenComplete((ch, e) -> {
resultList.add(ch);
System.out.println("【阶段3】线程" +
Thread.currentThread().getName() + "执行完毕," + "已将"
+ ch + "增加到了结果集" + resultList + "中");
executorService.shutdown();
})
).toArray(CompletableFuture[]::new);
// 封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();//future.get()
System.out.println("完成!result=" + resultList);
}
// 计算i的ASCII值
public static Integer calcASCII(Integer i) {
try {
if (i == 1) {
Thread.sleep(5000);
} else {
Thread.sleep(1000);
}
//数字 -> A-D对应的ascii
i = i + 64;
System.out.println("【阶段1】线程" + Thread.currentThread().getName()
+ "执行完毕," + "已将" + i
+ "转为了A(或B或C或D)对应的ASCII" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
3?测试结果
【阶段1】线程pool-1-thread-3执行完毕,已将67转为了A(或B或C或D)对应的ASCII67
【阶段1】线程pool-1-thread-4执行完毕,已将68转为了A(或B或C或D)对应的ASCII68
【阶段1】线程pool-1-thread-2执行完毕,已将66转为了A(或B或C或D)对应的ASCII66
【阶段2】线程pool-1-thread-3执行完毕,已将int67转为了字符C
【阶段2】线程pool-1-thread-4执行完毕,已将int68转为了字符D
【阶段2】线程pool-1-thread-2执行完毕,已将int66转为了字符B
【阶段3】线程pool-1-thread-3执行完毕,已将C增加到了结果集[C, D]中
【阶段3】线程pool-1-thread-4执行完毕,已将D增加到了结果集[C, D]中
【阶段3】线程pool-1-thread-2执行完毕,已将B增加到了结果集[C, D, B]中
【阶段1】线程pool-1-thread-1执行完毕,已将65转为了A(或B或C或D)对应的ASCII65
【阶段2】线程pool-1-thread-1执行完毕,已将int65转为了字符A
【阶段3】线程pool-1-thread-1执行完毕,已将A增加到了结果集[C, D, B, A]中
完成!result=[C, D, B, A]
|