Fork/Join(JAVA版MapReduce)
分治思想
接触过大数据的同学都应该了解过大数据知名计算框架MapReduce,MapReduce采用任务分解,结果合并的方式简单方便的完成大数据编程和计算处理,对于我们JAVA而言同样也是有类似计算处理框架这就是Fork/Join。
Fork/Join的这种思想称之为分治思想,分而治之将一个大任务拆分为多个子任务,将子任务再进行拆分直到无法拆分可以求解,再将所有的子任务结果合并,这就是分治思想。
Fork/Join
Fork/Join这个计算框架中Fork负责将任务进行拆分,Join将任务执行结果进行合并,Fork/Join的组成主要是两个部分,一个是分治任务的任务池ForkJoinPool,一个是任务对象ForkJoinTask,这种关系类似于线程池ThreadPoolExecutor和Runnable的关系,都可以理解为任务对象需要提交到任务池中执行,这种任务对象就是一种特殊的线程对象即可。
ForkJoinTask是一个抽象类,主要方法是fork()创建一个子任务异步执行,join()阻塞当前线程等待子任务执行结果,它存在两个实现类RecursiveTask和RecursiveAction,两个实现类的主要区别在于RecursiveTask支持返回值,而RecursiveAction不支持返回值。
了解这些之后,就可以思考以下问题,用程序如何实现斐波那契数列,什么叫做斐波那契数列呢?简单解释下就是除了前两个数都是1以外,其余的数字都是前面两个数字之和如[1,1,2,3,5,8…]。
如果采用简单的JAVA代码实现,一般采用递归实现如下
public static long fibonacciN(long n) {
System.out.println("进入==="+n);
if (n == 1 || n == 2)
return 1;
return fibonacciN(n - 1) + fibonacciN(n - 2);
}
这种递归的思想不就是我们前面提到的分治思想吗,那么用Fork/Join如何实现呢?
public class ForkJoinDemo {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
FibonacciTest fibonacciTest = new FibonacciTest(6);
Integer result = forkJoinPool.invoke(fibonacciTest);
System.out.println(result);
}
}
class FibonacciTest extends RecursiveTask<Integer>{
Integer num;
public FibonacciTest(Integer num){
this.num = num;
}
@Override
protected Integer compute() {
if (num == 1 || num == 2){
return 1;
}
FibonacciTest f1 = new FibonacciTest(num - 1);
f1.fork();
FibonacciTest f2 = new FibonacciTest(num - 2);
return f1.join() + f2.compute();
}
}
看到这里是不是有疑问为什么f1、f2不都使用fork异步执行呢?首先需要知道的是fork的作用,fork主要会创建一个子任务异步执行,如果f1、f2都异步执行了当前主线程是不是空闲了呢!这样消耗了资源的。具体解释如下:
假设一个酒店中有400个房间,4个清洁工,一个清洁工打扫100个房间那么正好一天可以干完。
Fork/Join的正确模式类似于工人甲分配了400个房间的工作量,这时工人甲认为工作量大,就将其中200个房间的工作量分配给工人乙,这时工人甲还是认为工作量大无法完成,又将200个房间工作量分配100个房间工作量给丙,工人乙将100个房间的工作量分配给丁这时所有的工人都能完成任务,投入的工人是4个。
而如果采用f1、f2都创建子任务异步执行的方式,那么任务分配就变为工人甲将400个房间的工作量分配给乙和丙每个人200个房间的工作量,而乙和丙又将工作量拆分为每人100个房间的工作量分配下去,甲、乙、丙三个人就成为了监工,本来4个工人就可以完成的工作量线程需要7个工人才能完成,还有三个人吃空饷。
所以为了避免上诉情况那么f1、f2就不能都异步执行,需要有一个任务由本线程完成,当然可以换一种写法采用JDK提供的invokeAll方法,invokeAll的N个任务中,其中N-1个任务会交由其它线程执行,会留一个任务给当前线程执行,这样充分利用了线程池,保证没有空闲的线程。
class FibonacciTest extends RecursiveTask<Integer>{
Integer num;
public FibonacciTest(Integer num){
this.num = num;
}
@Override
protected Integer compute() {
if (num == 1 || num == 2){
return 1;
}
FibonacciTest f1 = new FibonacciTest(num - 1);
FibonacciTest f2 = new FibonacciTest(num - 2);
invokeAll(f1,f2);
return f1.join() + f2.join();
}
}
Fork/Join的效率
用Fork/Join实现和用普通方法实现的效果是一样的,唯一点区别在于f1.fork();采用异步子任务执行,但是不是采用了Fork/Join效率就提高很多呢?其实不然,可以如下对比
public class TestForkJoin {
public static void main(String[] args) {
long n = 20;
long start = System.currentTimeMillis();
System.out.println("递归结算结果:" + fibonacciN(n) + " 耗时:" + (System.currentTimeMillis() - start));
System.out.println("================================");
start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
Fibonacci fibonacci = new Fibonacci(n);
Long result = forkJoinPool.invoke(fibonacci);
System.out.println("Fork/Join结果:" + result + " 耗时:" + (System.currentTimeMillis() - start));
}
public static long fibonacciN(long n) {
if (n == 1 || n == 2)
return 1;
return fibonacciN(n - 1) + fibonacciN(n - 2);
}
static class Fibonacci extends RecursiveTask<Long> {
private final long n;
public Fibonacci(long n) {
this.n = n;
}
@Override
protected Long compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
return f2.join() + f1.compute();
}
}
}
采用普通递归方式的运行时长却是比Fork/Join的运行时长少的多,这是为什么呢?
原因是Fork/Join执行的任务过于简单,大部分耗时不是在运行程序上面而是消耗在任务调度上面。
ForkJoinPool工作原理
Fork/Join的核心组件就是ForkJoinPool,了解ForkJoinPool就相当掌握了Fork/Join的核心内容,从上面的案例可以看出,ForkJoinPool和ThreadPoolExecutor其实是类似的。
ThreadPoolExecutor本质是消费者-生产者模型,线程池本身是消费者,向线程池提交任务的线程是生产者,线程池内部维持一个任务队列,所有的任务都提交到一个任务队列中,而ForkJoinPool有所不同,ForkJoinPool存在多个任务队列,每个线程根据一定的路由规则将任务提交到其中的一个任务队列中,如果该任务产生了子任务那么子任务也会提交到该线程所在的任务队列中。
如果工作线程所在的任务队列空了,是不是就阻塞了呢?当然不是如果任务队列空了,工作线程同样会找事情做,会获取其它任务队列中的任务,这就叫做任务窃取
Fork/Join实操
MapReduce最著名的入门级别程序便是单词统计,那用Fork/Join应该如何实现呢?
public class Test {
public static void main(String[] args) {
String[] fc = {"hello world", "hello me", "hello fork", "hello join", "fork join in world"};
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
MapReduceDemo mapReduceDemo = new MapReduceDemo(fc,0,fc.length);
Map<String, Integer> result = forkJoinPool.invoke(mapReduceDemo);
System.out.println(result);
}
}
class MapReduceDemo extends RecursiveTask<Map<String,Integer>> {
private String[] fc;
private int start;
private int end;
public MapReduceDemo(String[] fc, int start, int end) {
this.fc = fc;
this.start = start;
this.end = end;
}
@Override
protected Map<String, Integer> compute() {
if (end - start == 1){
return cal(fc[start]);
}else {
int mid = (start + end)/2;
MapReduceDemo m1 = new MapReduceDemo(fc,start,mid);
MapReduceDemo m2 = new MapReduceDemo(fc,mid,end);
invokeAll(m1,m2);
return merge(m1.join(),m2.join());
}
}
private Map<String,Integer> merge(Map<String, Integer> m1,Map<String, Integer> m2){
Map<String,Integer> map = new HashMap<>();
map.putAll(m1);
m2.forEach((k,v)->{
if(map.containsKey(k)){
Integer integer = map.get(k);
map.put(k,v+integer);
}else {
map.put(k,v);
}
});
return map;
}
private Map<String, Integer> cal(String line){
Map<String,Integer> map = new HashMap<>();
String[] words = line.split("\\s+");
for (int i=0;i<words.length;i++){
if (map.containsKey(words[i])){
Integer integer = map.get(words[i]);
map.put(words[i],integer+1);
}else {
map.put(words[i],1);
}
}
return map;
}
}
|