ForkJoin
Java 7 开始引入了一种新的 Fork/Join 线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。即“分而治之”。
ForkJoin 将相同的计算任务通过多线程的执行,提高计算速度。google 中的大数据处理框架MapReduce 就通过 ForkJoin 的思想,提高大数据的处理速度。
如何使用
使用 ForkJoin 框架,需要创建一个 ForkJoin 的任务。因为 ForkJoin 框架为我们提供了无返回值的 RecursiveAction 和有返回值的 RecursiveTask。源码如下:
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
protected abstract void compute();
public final Void getRawResult() { return null; }
protected final void setRawResult(Void mustBeNull) { }
protected final boolean exec() {
compute();
return true;
}
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
V result;
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
return true;
}
}
代码示例
我们只需要继承JDK为我们提供的其中一个并且实现compute方法。例如,求和:
public class ForkJoinRecursiveAction {
private final static int MAX_THRESHOLD = 3;
private final static AtomicInteger SUM = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(new CalculateRecursiveAction(0, 10));
forkJoinPool.awaitTermination(10, TimeUnit.SECONDS);
Optional.of(SUM).ifPresent(System.out::println);
}
private static class CalculateRecursiveAction extends RecursiveAction {
private final int start;
private final int end;
private CalculateRecursiveAction(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if ((end - start) <= MAX_THRESHOLD) {
SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
} else {
int middle = (start + end) / 2;
CalculateRecursiveAction leftAction = new CalculateRecursiveAction(start, middle);
CalculateRecursiveAction rightAction = new CalculateRecursiveAction(middle + 1, end);
leftAction.fork();
rightAction.fork();
}
}
}
}
public class ForkJoinRecursiveTask {
private final static int MAX_THRESHOLD = 200;
public static void main(String[] args) {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculatedRecursiveTask(0, 1000));
try {
Integer result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static class CalculatedRecursiveTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
CalculatedRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= MAX_THRESHOLD) {
return IntStream.rangeClosed(start, end).sum();
} else {
int middle = (start + end) / 2;
CalculatedRecursiveTask leftTask = new CalculatedRecursiveTask(start, middle);
CalculatedRecursiveTask rightTask = new CalculatedRecursiveTask(middle + 1, end);
leftTask.fork();
rightTask.fork();
return leftTask.join() + rightTask.join();
}
}
}
}
ForkJoinPool
task 要通过 ForkJoinPool 来执行,分割的子任务也会添加到当前工作线程的双端队列中, 进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作窃取)。
|