最近在看Java stream中的parallel()方法,此方法能够将一个串行的任务流转化为并行处理以加快其处理效率。这一点非常有趣,在看过源码后发现是其使用了Java 1.7提供的Fork-Join框架在发挥作用。今天我们来说明一下Fork-Join框架的作用。
Fork-Join是干啥的
众所周知,有一组任务需要处理,如果没有明确的顺序先后的要求,并行处理会比串行处理快很多,更好的利用了CPU的性能。而Java中的Fork-Join框架类似于大数据处理方案中的map-reduce的分治思想:它能将一组多个任务拆分(Fork)成许多小组并行处理,在处理结束后再对所有并行的结果进行合并(Join),最终达到加速处理的目的。
Fork-Join Pool又是干啥的
Fork-Join Pool 是基于Fork-Join框架提供的一款线程池,它同样能提供固定数量的线程,使多个任务并行运行。
那它与传统的线程池有什么区别呢?
传统线程池?使用了一个队列维护了一组需要并行处理的任务,采用FIFO原则,线程池中的线程们会从任务队列中不断的拿取新的任务进行处理,直至队列中任务处理完成,线程再去poll时被阻塞直至有新的任务进入队列。
Fork-Join Pool?中的每一个线程维护了一个队列,存着自己线程需要完成的任务。如果任务过大,则会继续递归拆分为更小的子任务,并push进当前线程维护的队列。同时为了最大化CPU利用率,保证线程池在有任务的情况下尽可能的满载,它提供了一个工作量窃取算法来降低整体完成时间:当某个线程维护的任务队列中的任务全部完成,次线程就会去其他线程寻找还没开始的任务,从对方的队列尾部抽取出来并放在自己的队列中开始运行。
额外需要注意的:
- Fork-Join Pool中每个线程维护的是一个双端队列,既可以FIFO,也可以LIFO。
- 在线程拆分任务时,会把拆分出来的子任务放在自己队列的栈顶;当线程消费自己的队列时,遵从LIFO,从栈顶提取任务;而窃取其他线程的队列任务时,遵从FIFO,从尾部提取任务。因此更多可能窃取到的是大任务,减少了调度次数,降低了切换成本(可见设计者的思路多么巧妙)。
- 在线程push子任务到自己的队列以及消费队列任务时无需加锁。窃取时需要加锁。试想当A线程的队列中只有1个元素时,B线程队列的任务消耗一空。此时窃取算法让B线程去A线程的队列中提取任务,资源抢占可能会产生并发问题(例如重复消费任务)。Fork-Join Pool通过Java中的CAS解决了这个问题,既保证了在并发场景下的同步,同时保证了足够轻量级。
- Fork-Join Pool如果不指定线程个数,默认以当前CPU的核数为线程个数。
举几个栗子🌰
光说不练假把式,我们来用几个栗子来说明一波。
1. 使用RecursiveAction
RecursiveAction类是一个实现了ForkJoinTask接口的抽象类,提供了无返回值的compute()方法实现业务逻辑。主要针对无需返回值的并行任务。
上代码!
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
class PrintTask extends RecursiveAction {
private final int start;
private final int end;
public PrintTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start < 5) { // 如果任务足够小,直接执行
for (int i = start; i <= end; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
} else { // 如果任务较大,递归拆分为小任务
System.out.println("********拆分任务********");
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle + 1, end);
// 并行执行两个小任务
left.fork();
right.fork();
}
}
}
public class ForkJoinPoolTest {
public static void main(String[] args) throws InterruptedException {
System.out.println("CPU核数为:" + Runtime.getRuntime().availableProcessors());
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.submit(new PrintTask(1, 50));
forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
forkJoinPool.shutdown();
}
}
运行结果:
CPU核数为:8 ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ForkJoinPool-1-worker-7::34 ForkJoinPool-1-worker-7::35 ForkJoinPool-1-worker-7::36 ForkJoinPool-1-worker-1::4 ForkJoinPool-1-worker-1::5 ForkJoinPool-1-worker-1::6 ForkJoinPool-1-worker-13::1 ForkJoinPool-1-worker-13::2 ForkJoinPool-1-worker-13::3 ForkJoinPool-1-worker-13::7 ForkJoinPool-1-worker-13::8 ForkJoinPool-1-worker-13::9 ForkJoinPool-1-worker-13::25 ForkJoinPool-1-worker-13::26 ForkJoinPool-1-worker-13::27 ********拆分任务******** ForkJoinPool-1-worker-13::40 ForkJoinPool-1-worker-13::41 ForkJoinPool-1-worker-13::42 ForkJoinPool-1-worker-13::37 ForkJoinPool-1-worker-13::38 ForkJoinPool-1-worker-1::43 ForkJoinPool-1-worker-1::44 ForkJoinPool-1-worker-1::45 ********拆分任务******** ForkJoinPool-1-worker-1::16 ForkJoinPool-1-worker-1::17 ForkJoinPool-1-worker-1::18 ForkJoinPool-1-worker-7::31 ...
2. 使用RecursiveTask
RecursiveTask类通用是一个实现了ForkJoinTask接口的抽象类,提供了有返回值的compute()方法实现业务逻辑。主要针对有返回值的并行任务。
继续上代码!
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class SumTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - start < 5) { // 如果任务足够小,直接执行
for (int i = start; i <= end; i++) {
sum += i;
}
} else { // 如果任务较大,递归拆分为小任务
System.out.println("********拆分任务********");
int middle = (start + end) / 2;
SumTask left = new SumTask(start, middle);
SumTask right = new SumTask(middle + 1, end);
// 并行执行两个小任务
left.fork();
right.fork();
return left.join() + right.join();
}
return sum;
}
}
public class ForkJoinPoolTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("CPU核数为:" + Runtime.getRuntime().availableProcessors());
System.out.println("期望1-50累加得到的结果为:1275");
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> result = forkJoinPool.submit(new SumTask(1, 50));
System.out.println("通过ForkJoin算出的值为:" + result.get());
forkJoinPool.shutdown();
}
}
运行结果:
CPU核数为:8 期望1-50累加得到的结果为:1275 ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** ********拆分任务******** 通过ForkJoin算出的值为:1275
总结
ForkJoin算法为JDK 1.7及其之后的并行运算提供了更多的可能,它使用了更合适的方式尽可能多的“压榨”硬件的空闲时间以达到加快运算效率的能力。
|