IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Fork/Join 2021-09-01 -> 正文阅读

[大数据]Fork/Join 2021-09-01

Fork/Join

jdk除了提供ThreadPoolExecutor这样的线程池,还提供了ForkJoinPool,顾名思义,先fork,再join,先分解任务,处理各个子任务,再合并,采用“分治”的思想

分治,分而治之。把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解

像归并排序,快速排序,二分查找都属于分支算法,以及大数据领域计算框架MapReduce同样利用了分治思想

分治任务模型

  1. 任务的分解

    ⅰ将任务分解到可以直接计算结果的粒度

  2. 结果的合并

    ⅱ逐层合并子任务的执行结果

Fork/Join的使用

Fork/Join 是支持分治任务模型的并行计算框架。

Fork对应分治任务模型里的任务分解,Join对应的是结果合并。

Fork/Join框架包含ForkJoinPool和ForkJoinTask,

ForkJoinTask是abstract的,核心方法有fork()和join(),fork()异步地执行一个子任务,join()会阻塞当前线程等待子任务的执行结果。

RecursiveAction和RecursiveTask是ForkJoinTask的abstract子类,我们使用Fork/Join时,通常是继承前者,利用compute()实现并行计算的能力。 RecursiveAction#compute()无返回值,RecursiveTask#compute()有返回值

public class FibonacciTask extends RecursiveTask<Integer> {

    private Integer n;

    public FibonacciTask(Integer n) {
        this.n = n;
    }

    @Override protected Integer compute() {
        if (n <= 1) {
            return n;
        }

        FibonacciTask task1 = new FibonacciTask(n-1);
        task1.fork();

        FibonacciTask task2 = new FibonacciTask(n-2);
        // compute() 和 join() 的顺序不能交换,task1.join() 会挂起当前线程等待合并任务执行完成,
        // 由于task2.compute()不能执行,导致并行度下降
        return task2.compute() + task1.join();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        RecursiveTask<Integer> task = new FibonacciTask(10);
        forkJoinPool.submit(task);
        System.out.println(task.get());
    }
}

众所周知,ThreadPoolExecutor内部使用一个任务队列来存放任务,而ForkJoinPool内部使用了多个任务队列。当使用ForkJoinPool#invoke()或ForkJoinPool#submit() 提交任务时,通过一定的路由规则提交到某个任务队列。且两者本质上都是“生产者-消费者模式”的实现

如果任务在执行过程中会创建子任务,子任务也会提交到工作线程对应的任务队列中

Fork/Join支持“任务窃取”的机制,工作线程对应的工作队列为空时,可以“窃取”其他工作队列中的任务,保障并行任务的高效率。这里任务队列是双端队列,工作线程正常执行任务和“窃取”任务分别从任务队列的不同端获取

在这里插入图片描述

模拟MapReduce统计单词数量

/**
 * @version 1.0.0
 * @description 统计文章中单词出现的次数
 * @date 2021/9/1 12:35
 */
public class MapReduceTask extends RecursiveTask<Map<String, Integer>> {

    private String[] data;
    private int start;
    private int end;

    public MapReduceTask(String[] data, int start, int end) {
        this.data = data;
        this.start = start;
        this.end = end;
    }

    @Override protected Map<String, Integer> compute() {
        if (end - start <= 1) {
            return countWords(data[start]);
        }

        int mid = (start + end)/2;
        MapReduceTask task1 = new MapReduceTask(data, start, mid);
        task1.fork();

        MapReduceTask task2 = new MapReduceTask(data, mid, end);
        return merge(task2.compute(), task1.join());
    }

    private Map<String, Integer> countWords(String line) {
        String[] wordsPerRow = line.split(" ");
        Map<String, Integer> result = new HashMap<>(wordsPerRow.length);
        Arrays.stream(wordsPerRow).forEach(str -> {
            Integer sum = result.get(str);
            if (sum == null) {
                result.put(str, 1);
            } else {
                result.put(str, sum + 1);
            }
        });
        return result;
    }

    private Map<String, Integer> merge(Map<String, Integer> source, Map<String, Integer> target) {
        Map<String, Integer> result = new HashMap<>();
        result.putAll(source);
        target.forEach( (k, v)-> {
            Integer count = result.get(k);
            if (count != null) {
                result.put(k, count + v);
            } else {
                result.put(k, v);
            }
        });
        return result;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        String[] data = {
            "source target merge result", "source target merge result", "one two three", "one two second",
            "import java util concurrent RecursiveTask", "import java util concurrent RecursiveTask",
            "import java util concurrent RecursiveTask", "import java util concurrent RecursiveTask hg"
        };
        MapReduceTask task = new MapReduceTask(data, 0, data.length);
        Map<String, Integer> result = forkJoinPool.invoke(task);
        result.forEach((k, v) -> System.out.println(String.format("[%s = %s]", k, v)));
    }
}

jdk8 中stream并行流的计算共享了系统提供的一个ForkJoinPool,线程数默认是CPU的核数,如果并行流计算都是CPU密集型,计算没有问题,如果存在I/O密集型的并行流计算,很可能会因为一个很慢的I/O计算而拖慢整个系统的性能。

建议使用不同的ForkJoinPool执行不同类型的计算任务

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-02 11:26:45  更:2021-09-02 11:29:53 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:03:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码