一、算法
1、二分法
假设我们遇到一个规模很大的问题,很难一下子解决。那么要怎么处理呢?
思路一:能不能把这个大规模的问题变为每次减少一点呢?比如从n到n-1。 思路二:既然可以每次减少一点,那能不能每次减少几倍?效果会不会更好?
这里我们介绍思路二中的一个比较有名的算法——二分查找(也叫折半查找)。
具体实现思路:在规模较大的问题n中,查找目标值a,每次折半,然后看目标数值a所在的区间,然后继续折半,直到匹配出具体的值。
2、分治法
思路:把一个规模为n的问题,一直对半拆分,拆到可以求解为止,然后将结果整合起来。
二、实践
fork/join框架
以分治的思想使用线程
fork/join框架是java7提供的用于并行执行任务的框架。
举个例子
public class ForkJoinTestDemo extends RecursiveTask<Long> {
public long start;
public long end;
public ForkJoinTestDemo(){}
public ForkJoinTestDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if((end-start)==0){
System.out.println("fork end,start join:"+end);
return end;
}else {
long mid=(start+end)/2;
log.info("start="+start+",end="+end+",mid="+mid);
ForkJoinTestDemo f1=new ForkJoinTestDemo(start,mid);
f1.fork();
ForkJoinTestDemo f2=new ForkJoinTestDemo(mid+1,end);
f2.fork();
return f1.join()+f2.join();
}
}
public static void main(String[] args) {
ForkJoinPool pool=new ForkJoinPool();
System.out.println(pool.invoke(new ForkJoinTestDemo(1,8)));
}
}
打印结果:
start=1,end=8,mid=4
start=5,end=8,mid=6
start=1,end=4,mid=2
start=5,end=6,mid=5
start=1,end=2,mid=1
start=7,end=8,mid=7
fork end,start join:1
fork end,start join:2
fork end,start join:8
start=3,end=4,mid=3
fork end,start join:3
fork end,start join:4
fork end,start join:5
fork end,start join:6
fork end,start join:7
36
可以看出,分为两个步骤:fork和join。fork阶段做问题拆分,join整合结果。
实现原理
1)类关系
上面的例子中,我们创建的ForkJoinTestDemo类,继承自RecursiveTask。关系图如下: Future:提供了可以获取异步执行的返回值的接口。 ForkJoinTask: 提供在任务中执行fork()和join()操作的机制。主要负责在 ForkJoinWorkerThread 和 ForkJoinPool 方法的中维护其“状态”字段。 RecursiveTask:提供一个带有结果的递归调用。 ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
2)主要方法
compute()
在RecursiveTask中,实现了ForkJoinTask的exec(),在exec()中执行了compute()。并把计算的结果保存下来。
执行任务,成功时返回true。
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
V result;
protected final boolean exec() {
result = compute();
return true;
}
}
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
protected abstract boolean exec();
}
fork()
判断是不是一个 ForkJoinWorkerThread 的工作线程,如果是,则将任务加入到内部队列中,否则,由 ForkJoinPool 提供的内部公用的线程池 common 线程池 来执行这个任务。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
join()
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
if ((a = array) != null && (s = top) != base &&
U.compareAndSwapObject
(a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
U.putOrderedInt(this, QTOP, s);
return true;
}
return false;
}
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
if ((s = task.status) < 0)
break;
if (cc != null)
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task);
if ((s = task.status) < 0)
break;
long ms, ns;
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
三、总结
- fork/join框架是使用分治法的思想,来合理利用线程。
- fork/join框架作为并发框架,与单一线程执行相比不一定会快,要做好相关压测和调优。
- 使用“工作窃取”的方式,优化多线程下的任务执行。使用Unsafe直接操作实现原子性。
|