IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 数据结构与算法 -> 浅析Fork/Join -> 正文阅读

[数据结构与算法]浅析Fork/Join

一、算法

1、二分法

假设我们遇到一个规模很大的问题,很难一下子解决。那么要怎么处理呢?

思路一:能不能把这个大规模的问题变为每次减少一点呢?比如从n到n-1。
思路二:既然可以每次减少一点,那能不能每次减少几倍?效果会不会更好?

在这里插入图片描述
这里我们介绍思路二中的一个比较有名的算法——二分查找(也叫折半查找)。
在这里插入图片描述

具体实现思路:在规模较大的问题n中,查找目标值a,每次折半,然后看目标数值a所在的区间,然后继续折半,直到匹配出具体的值。

2、分治法

思路:把一个规模为n的问题,一直对半拆分,拆到可以求解为止,然后将结果整合起来。

在这里插入图片描述

二、实践

fork/join框架

以分治的思想使用线程

fork/join框架是java7提供的用于并行执行任务的框架。

举个例子

public class ForkJoinTestDemo extends RecursiveTask<Long> {
    public long start;
    public long end;
    
    public ForkJoinTestDemo(){}
    public ForkJoinTestDemo(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if((end-start)==0){
            System.out.println("fork end,start join:"+end);
            return end;
        }else {
            long mid=(start+end)/2;
            log.info("start="+start+",end="+end+",mid="+mid);
            ForkJoinTestDemo f1=new ForkJoinTestDemo(start,mid);
            f1.fork();
            ForkJoinTestDemo f2=new ForkJoinTestDemo(mid+1,end);
            f2.fork();
            return f1.join()+f2.join();
        }
    }

    public static void main(String[] args) {
        ForkJoinPool pool=new ForkJoinPool();
        //代码示例:求1~8数字之和。
        System.out.println(pool.invoke(new ForkJoinTestDemo(1,8)));
    }

}

打印结果:

start=1,end=8,mid=4
start=5,end=8,mid=6
start=1,end=4,mid=2
start=5,end=6,mid=5
start=1,end=2,mid=1
start=7,end=8,mid=7
fork end,start join:1
fork end,start join:2
fork end,start join:8
start=3,end=4,mid=3
fork end,start join:3
fork end,start join:4
fork end,start join:5
fork end,start join:6
fork end,start join:7
36

可以看出,分为两个步骤:fork和join。fork阶段做问题拆分,join整合结果。

实现原理

1)类关系

上面的例子中,我们创建的ForkJoinTestDemo类,继承自RecursiveTask。关系图如下:
在这里插入图片描述
Future:提供了可以获取异步执行的返回值的接口。
ForkJoinTask: 提供在任务中执行fork()和join()操作的机制。主要负责在 ForkJoinWorkerThread 和 ForkJoinPool 方法的中维护其“状态”字段。
RecursiveTask:提供一个带有结果的递归调用。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

2)主要方法

compute()

在RecursiveTask中,实现了ForkJoinTask的exec(),在exec()中执行了compute()。并把计算的结果保存下来。

执行任务,成功时返回true。

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {

	V result;
	protected final boolean exec() {
	        result = compute();
	        return true;
	    }
}

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    protected abstract boolean exec();
}

fork()

判断是不是一个 ForkJoinWorkerThread 的工作线程,如果是,则将任务加入到内部队列中,否则,由 ForkJoinPool 提供的内部公用的线程池 common 线程池 来执行这个任务。

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

join()


 public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

/*
* 步骤一:判断是否完成:若是-直接返回s。若不是,继续执行。
* 步骤二:判断是否是ForkJoinWorkerThread,若不是则执行externalAwaitDone()。若是继续执行。
* 步骤三:判断要join的任务正好在当前任务队列的顶端,那么pop出这个任务,然后调用 doExec() 让当 * 前线程去执行这个任务。否则执行awaitJoin()。
*/
private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
//当给定任务位于顶部时,弹出该任务。
 final boolean tryUnpush(ForkJoinTask<?> t) {
            ForkJoinTask<?>[] a; int s;
            if ((a = array) != null && (s = top) != base &&
                U.compareAndSwapObject
                (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
                U.putOrderedInt(this, QTOP, s);
                return true;
            }
            return false;
        }
//任务窃取的主要执行方法。调用 exec 并记录状态。
//其中,上面提到过,我们对exec()重写,自定义处理规则。
final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

//当给定任务不在顶部时,帮助和或阻塞,直到给定任务完成或超时。
   final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
            ForkJoinTask<?> prevJoin = w.currentJoin;
            U.putOrderedObject(w, QCURRENTJOIN, task);
            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>)task : null;
            for (;;) {
            	//任务完成,跳出循环
                if ((s = task.status) < 0)
                    break;
                if (cc != null)
   //当前任务是CountedCompleter类型,则尝试从任务队列中获取当前任务的派生子任务来执行完成任务
                    helpComplete(w, cc, 0);
                else if (w.base == w.top || w.tryRemoveAndExec(task))
         //窃取任务,如果当前线程的内部队列为空,或者成功完成了任务,帮助某个线程完成任务。
                    helpStealer(w, task);
                //任务完成,跳出循环
                if ((s = task.status) < 0)
                    break;
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                    break;
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
                if (tryCompensate(w)) {
                    task.internalWait(ms);
                    U.getAndAddLong(this, CTL, AC_UNIT);
                }
            }
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        }
        return s;
    }

三、总结

  • fork/join框架是使用分治法的思想,来合理利用线程。
  • fork/join框架作为并发框架,与单一线程执行相比不一定会快,要做好相关压测和调优。
  • 使用“工作窃取”的方式,优化多线程下的任务执行。使用Unsafe直接操作实现原子性。
  数据结构与算法 最新文章
【力扣106】 从中序与后续遍历序列构造二叉
leetcode 322 零钱兑换
哈希的应用:海量数据处理
动态规划|最短Hamilton路径
华为机试_HJ41 称砝码【中等】【menset】【
【C与数据结构】——寒假提高每日练习Day1
基础算法——堆排序
2023王道数据结构线性表--单链表课后习题部
LeetCode 之 反转链表的一部分
【题解】lintcode必刷50题<有效的括号序列
上一篇文章      下一篇文章      查看所有文章
加:2022-03-06 13:22:01  更:2022-03-06 13:24:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/10 2:06:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码