目录
一、介绍
二、工作窃取算法
三、Fork/Join实例
1. 分割任务
2.?ForkJoinPool线程池执行任务
四、Fork/Join框架实现原理
1.?ForkJoinPool线程池
2.?ForkJoinTask
3. fork()
4. join()
五、参考资料
一、介绍
????????Fork/Join框架是一个并行执行任务的框架,目的是把大任务分割成若干个子任务(每个任务否是一个Fork,且每个任务还可以再分割),最终每个任务执行结果合并到总结果。如下图所示,是Fork/Join的运行流程图。
? ? ? ? 如上图看出,Fork就是把一个大任务切分为若干子任务并行的执行;Join就是合并子任务的执行结果。
二、工作窃取算法
????????工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。如下图所示,线程1负责处理A队列里的任务,线程2负责处理B队列里的任务。线程2已经把自己队列里的任务干完,于是线程2就去线程1的队列里窃取一个任务来执行。????????
? ? ? ? 工作窃取算法的特性:
- 优点:并行计算,来提高执行效率;双端队列,减少线程间竞争。
- 缺点:双端队列只有一个任务时,也会产生线程间竞争。
? ? ? ? 如上图所示, 采用双端队列,是为了减少窃取任务线程和被窃取任务线程之间的竞争。被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
三、Fork/Join实例
1. 分割任务
? ? ? ? 分割任务就是把大任务分割成多个子任务。如下代码所示,分割阈值THRESHOLD为20,较小的任务不用分割,直接计算累加求和。
/**
* @description 任务分割
* @author TCM
* @version 1.0
* @date 2022/4/13 16:11
**/
public class SumTask extends RecursiveTask<Integer> {
// 分割子任务阈值
private static final int THRESHOLD = 20;
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
// 如果小于阈值,直接调用最小任务的计算方法
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算,即:2个fork
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(start, middle);
SumTask rightTask = new SumTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
LogUtil.info("当前计算结果为:" + sum);
}
return sum;
}
}
2.?ForkJoinPool线程池执行任务
? ? ? ? 创建任务后及任务分割后的子任务,都交给ForkJoinPool线程池来执行任务。
public static void main(String[] args) {
// ForkJoinPool执行分割的任何
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4+...+100
SumTask task = new SumTask(1, 100);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (Exception e) {
e.printStackTrace();
}
}
//执行结果
20:13:51.871 [ForkJoinPool-1-worker-2] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:1575
20:13:51.871 [ForkJoinPool-1-worker-4] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:950
20:13:51.871 [ForkJoinPool-1-worker-9] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:325
20:13:51.871 [ForkJoinPool-1-worker-6] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:2200
20:13:51.874 [ForkJoinPool-1-worker-9] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:1275
20:13:51.874 [ForkJoinPool-1-worker-11] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:3775
20:13:51.874 [ForkJoinPool-1-worker-9] INFO LOG_INFO - 127.0.0.1||^当前计算结果为:5050
计算结果:1+2+3+4+...+100 = 5050
四、Fork/Join框架实现原理
1.?ForkJoinPool线程池
????????ForkJoinPool线程池会创建多个ForkJoinWorkerThread实例的线程,ForkJoinWorkerThread线程用来执行任务。每个线程都有一个工作队列java.util.concurrent.ForkJoinPool.WorkQueue,该队列有属性ForkJoinTask<?>[] array,用来存放当前线程需要执行的任务。所以,ForkJoinTask数组负责存放提交的任务,ForkJoinWorkerThread线程负责执行这些任务。而如上章节《介绍》图中所示。
2.?ForkJoinTask
? ? ? ? ?ForkJoinTask定义了执行的任务。如下图所示是ForkJoinTask的实现类。如上章节《Fork/Join实例》中的分割任务,不需要直接继承ForkJoinTask类,只需要继承它的子类即可。
? ? ? ? 覆写子类中的compute(),规定符合阈值时,任务再分割多个子任务。
3. fork()
? ? ? ? 如下代码所示。任务调用fork(),push(this)把当前任务存放到ForkJoinTask[]中,并通过线程池调用signalWork()方法来唤醒和创建一个工作线程ForkJoinWorkerThread执行任务。
// 执行子任务
public final ForkJoinTask<V> fork() {
Thread t;
// 判定当前线程是否是ForkJoinWorkerThread实例线程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
// 把调用fork()的任务加入到ForkJoinWorkerThread实例线程的workQueue工作队列的ForkJoinTask[]中
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
// 当前任务添加到ForkJoinTask[]中,并唤醒线程执行任务
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
// 唤醒和创建一个工作线程ForkJoinWorkerThread来执行任务
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
4. join()
???????? 如下代码所示。任务调用join(),阻塞执行当前任务的线程,来获取执行结果。根据doJoin()返回的任务状态,只有状态等于NORMAL时,表示任务正常结束。
// 获取当前线程执行结果
public final V join() {
int s;
// doJoin()返回任务状态来判断执行结果
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
// 状态 == NORMAL时,表示任务正常结束,获取执行结果
return getRawResult();
}
五、参考资料
Fork/Join框架原理解析_华山拎壶冲的博客-CSDN博客_fork join
ForkJoin框架使用和原理剖析_codingtu的博客-CSDN博客_forkjointask
Fork/Join框架的理解和使用 - 简书
Java并发编程|Fork/Join框架机制详解
Fork/Join框架详解 - 木易森林 - 博客园
|