IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Java的Fork-Join框架 -> 正文阅读

[Java知识库]Java的Fork-Join框架

最近在看Java stream中的parallel()方法,此方法能够将一个串行的任务流转化为并行处理以加快其处理效率。这一点非常有趣,在看过源码后发现是其使用了Java 1.7提供的Fork-Join框架在发挥作用。今天我们来说明一下Fork-Join框架的作用。

Fork-Join是干啥的

众所周知,有一组任务需要处理,如果没有明确的顺序先后的要求,并行处理会比串行处理快很多,更好的利用了CPU的性能。而Java中的Fork-Join框架类似于大数据处理方案中的map-reduce的分治思想:它能将一组多个任务拆分(Fork)成许多小组并行处理,在处理结束后再对所有并行的结果进行合并(Join),最终达到加速处理的目的。

Fork-Join Pool又是干啥的

Fork-Join Pool 是基于Fork-Join框架提供的一款线程池,它同样能提供固定数量的线程,使多个任务并行运行。

那它与传统的线程池有什么区别呢?

传统线程池?使用了一个队列维护了一组需要并行处理的任务,采用FIFO原则,线程池中的线程们会从任务队列中不断的拿取新的任务进行处理,直至队列中任务处理完成,线程再去poll时被阻塞直至有新的任务进入队列。

Fork-Join Pool?中的每一个线程维护了一个队列,存着自己线程需要完成的任务。如果任务过大,则会继续递归拆分为更小的子任务,并push进当前线程维护的队列。同时为了最大化CPU利用率,保证线程池在有任务的情况下尽可能的满载,它提供了一个工作量窃取算法来降低整体完成时间:当某个线程维护的任务队列中的任务全部完成,次线程就会去其他线程寻找还没开始的任务,从对方的队列尾部抽取出来并放在自己的队列中开始运行。

额外需要注意的:

  • Fork-Join Pool中每个线程维护的是一个双端队列,既可以FIFO,也可以LIFO。
  • 在线程拆分任务时,会把拆分出来的子任务放在自己队列的栈顶;当线程消费自己的队列时,遵从LIFO,从栈顶提取任务;而窃取其他线程的队列任务时,遵从FIFO,从尾部提取任务。因此更多可能窃取到的是大任务,减少了调度次数,降低了切换成本(可见设计者的思路多么巧妙)。
  • 在线程push子任务到自己的队列以及消费队列任务时无需加锁。窃取时需要加锁。试想当A线程的队列中只有1个元素时,B线程队列的任务消耗一空。此时窃取算法让B线程去A线程的队列中提取任务,资源抢占可能会产生并发问题(例如重复消费任务)。Fork-Join Pool通过Java中的CAS解决了这个问题,既保证了在并发场景下的同步,同时保证了足够轻量级。
  • Fork-Join Pool如果不指定线程个数,默认以当前CPU的核数为线程个数。

举几个栗子🌰

光说不练假把式,我们来用几个栗子来说明一波。

1. 使用RecursiveAction

RecursiveAction类是一个实现了ForkJoinTask接口的抽象类,提供了无返回值的compute()方法实现业务逻辑。主要针对无需返回值的并行任务。

上代码!

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;

class PrintTask extends RecursiveAction {

  private final int start;

  private final int end;

  public PrintTask(int start, int end) {
    this.start = start;
    this.end = end;
  }

  @Override
  protected void compute() {
    if (end - start < 5) { // 如果任务足够小,直接执行
      for (int i = start; i <= end; i++) {
        System.out.println(Thread.currentThread().getName() + "::" + i);
      }
    } else { // 如果任务较大,递归拆分为小任务
      System.out.println("********拆分任务********");
      int middle = (start + end) / 2;
      PrintTask left = new PrintTask(start, middle);
      PrintTask right = new PrintTask(middle + 1, end);
      // 并行执行两个小任务
      left.fork();
      right.fork();
    }
  }
}

public class ForkJoinPoolTest {

  public static void main(String[] args) throws InterruptedException {
    System.out.println("CPU核数为:" + Runtime.getRuntime().availableProcessors());

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    forkJoinPool.submit(new PrintTask(1, 50));
    forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
    forkJoinPool.shutdown();
  }

}

运行结果:

CPU核数为:8
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
ForkJoinPool-1-worker-7::34
ForkJoinPool-1-worker-7::35
ForkJoinPool-1-worker-7::36
ForkJoinPool-1-worker-1::4
ForkJoinPool-1-worker-1::5
ForkJoinPool-1-worker-1::6
ForkJoinPool-1-worker-13::1
ForkJoinPool-1-worker-13::2
ForkJoinPool-1-worker-13::3
ForkJoinPool-1-worker-13::7
ForkJoinPool-1-worker-13::8
ForkJoinPool-1-worker-13::9
ForkJoinPool-1-worker-13::25
ForkJoinPool-1-worker-13::26
ForkJoinPool-1-worker-13::27
********拆分任务********
ForkJoinPool-1-worker-13::40
ForkJoinPool-1-worker-13::41
ForkJoinPool-1-worker-13::42
ForkJoinPool-1-worker-13::37
ForkJoinPool-1-worker-13::38
ForkJoinPool-1-worker-1::43
ForkJoinPool-1-worker-1::44
ForkJoinPool-1-worker-1::45
********拆分任务********
ForkJoinPool-1-worker-1::16
ForkJoinPool-1-worker-1::17
ForkJoinPool-1-worker-1::18
ForkJoinPool-1-worker-7::31
...

2. 使用RecursiveTask

RecursiveTask类通用是一个实现了ForkJoinTask接口的抽象类,提供了有返回值的compute()方法实现业务逻辑。主要针对有返回值的并行任务。

继续上代码!

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class SumTask extends RecursiveTask<Integer> {

  private final int start;

  private final int end;

  public SumTask(int start, int end) {
    this.start = start;
    this.end = end;
  }

  @Override
  protected Integer compute() {
    int sum = 0;
    if (end - start < 5) { // 如果任务足够小,直接执行
      for (int i = start; i <= end; i++) {
        sum += i;
      }
    } else { // 如果任务较大,递归拆分为小任务
      System.out.println("********拆分任务********");
      int middle = (start + end) / 2;
      SumTask left = new SumTask(start, middle);
      SumTask right = new SumTask(middle + 1, end);
      // 并行执行两个小任务
      left.fork();
      right.fork();
      return left.join() + right.join();
    }
    return sum;
  }
}

public class ForkJoinPoolTest {

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    System.out.println("CPU核数为:" + Runtime.getRuntime().availableProcessors());
    System.out.println("期望1-50累加得到的结果为:1275");

    ForkJoinPool forkJoinPool = new ForkJoinPool();
    ForkJoinTask<Integer> result = forkJoinPool.submit(new SumTask(1, 50));
    System.out.println("通过ForkJoin算出的值为:" + result.get());
    forkJoinPool.shutdown();
  }

}

运行结果:

CPU核数为:8
期望1-50累加得到的结果为:1275
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
********拆分任务********
通过ForkJoin算出的值为:1275

总结

ForkJoin算法为JDK 1.7及其之后的并行运算提供了更多的可能,它使用了更合适的方式尽可能多的“压榨”硬件的空闲时间以达到加快运算效率的能力。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-11-11 12:35:06  更:2021-11-11 12:36:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 1:12:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码