Fork/Join
jdk除了提供ThreadPoolExecutor这样的线程池,还提供了ForkJoinPool,顾名思义,先fork,再join,先分解任务,处理各个子任务,再合并,采用“分治”的思想
分治,分而治之。把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解
像归并排序,快速排序,二分查找都属于分支算法,以及大数据领域计算框架MapReduce同样利用了分治思想
分治任务模型
-
任务的分解 ⅰ将任务分解到可以直接计算结果的粒度 -
结果的合并 ⅱ逐层合并子任务的执行结果
Fork/Join的使用
Fork/Join 是支持分治任务模型的并行计算框架。
Fork对应分治任务模型里的任务分解,Join对应的是结果合并。
Fork/Join框架包含ForkJoinPool和ForkJoinTask,
ForkJoinTask是abstract的,核心方法有fork()和join(),fork()异步地执行一个子任务,join()会阻塞当前线程等待子任务的执行结果。
RecursiveAction和RecursiveTask是ForkJoinTask的abstract子类,我们使用Fork/Join时,通常是继承前者,利用compute()实现并行计算的能力。 RecursiveAction#compute()无返回值,RecursiveTask#compute()有返回值
public class FibonacciTask extends RecursiveTask<Integer> {
private Integer n;
public FibonacciTask(Integer n) {
this.n = n;
}
@Override protected Integer compute() {
if (n <= 1) {
return n;
}
FibonacciTask task1 = new FibonacciTask(n-1);
task1.fork();
FibonacciTask task2 = new FibonacciTask(n-2);
return task2.compute() + task1.join();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
RecursiveTask<Integer> task = new FibonacciTask(10);
forkJoinPool.submit(task);
System.out.println(task.get());
}
}
众所周知,ThreadPoolExecutor内部使用一个任务队列来存放任务,而ForkJoinPool内部使用了多个任务队列。当使用ForkJoinPool#invoke()或ForkJoinPool#submit() 提交任务时,通过一定的路由规则提交到某个任务队列。且两者本质上都是“生产者-消费者模式”的实现
如果任务在执行过程中会创建子任务,子任务也会提交到工作线程对应的任务队列中
Fork/Join支持“任务窃取”的机制,工作线程对应的工作队列为空时,可以“窃取”其他工作队列中的任务,保障并行任务的高效率。这里任务队列是双端队列,工作线程正常执行任务和“窃取”任务分别从任务队列的不同端获取
模拟MapReduce统计单词数量
public class MapReduceTask extends RecursiveTask<Map<String, Integer>> {
private String[] data;
private int start;
private int end;
public MapReduceTask(String[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
@Override protected Map<String, Integer> compute() {
if (end - start <= 1) {
return countWords(data[start]);
}
int mid = (start + end)/2;
MapReduceTask task1 = new MapReduceTask(data, start, mid);
task1.fork();
MapReduceTask task2 = new MapReduceTask(data, mid, end);
return merge(task2.compute(), task1.join());
}
private Map<String, Integer> countWords(String line) {
String[] wordsPerRow = line.split(" ");
Map<String, Integer> result = new HashMap<>(wordsPerRow.length);
Arrays.stream(wordsPerRow).forEach(str -> {
Integer sum = result.get(str);
if (sum == null) {
result.put(str, 1);
} else {
result.put(str, sum + 1);
}
});
return result;
}
private Map<String, Integer> merge(Map<String, Integer> source, Map<String, Integer> target) {
Map<String, Integer> result = new HashMap<>();
result.putAll(source);
target.forEach( (k, v)-> {
Integer count = result.get(k);
if (count != null) {
result.put(k, count + v);
} else {
result.put(k, v);
}
});
return result;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
String[] data = {
"source target merge result", "source target merge result", "one two three", "one two second",
"import java util concurrent RecursiveTask", "import java util concurrent RecursiveTask",
"import java util concurrent RecursiveTask", "import java util concurrent RecursiveTask hg"
};
MapReduceTask task = new MapReduceTask(data, 0, data.length);
Map<String, Integer> result = forkJoinPool.invoke(task);
result.forEach((k, v) -> System.out.println(String.format("[%s = %s]", k, v)));
}
}
jdk8 中stream并行流的计算共享了系统提供的一个ForkJoinPool,线程数默认是CPU的核数,如果并行流计算都是CPU密集型,计算没有问题,如果存在I/O密集型的并行流计算,很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。
建议使用不同的ForkJoinPool执行不同类型的计算任务。
|