Fork-Join
Parallel Stream实现任务的切分,并将任务提交到全局的ForkJoinPool线程池中执行,注意,是全局的线程池。关于ForkJoinPool,我这里简单介绍下。
在Fork-Join中,比如一个拥有4个线程的ForkJoinPool线程池,有一个任务队列,一个大的任务切分出的子任务会提交到线程池的任务队列中,4个线程从任务队列中获取任务执行,哪个线程执行的任务快,哪个线程执行的任务就多,只有队列中没有任务线程才是空闲的,这就是工作窃取。可以这样理解工作窃取,比如有4个人干8件事情,理应每个人干2件,但干活快的干完自己的事情后可以去帮别人干。
正如图中所示,一个任务可以fork中很多个子任务,当然不只是图中看到的只有左右两个。假设,每个任务都只fork出两个子任务,如果负责fork子任务的当前任务不做任何事情,那么最终将只有叶子节点真正做事情,其它节点都只是负责fork子任务与合并结果(假设是有返回值的任务)。
如果是没有返回值的任务,是没有图中“合并结果”这个流程的;而且,也不是必须要等待子任务执行完成。这些都是根据自己的需求来自定义使用的。要灵活去使用。
比如,处理一个1+2+3+…+100的加法运算任务,就需要获取返回值,而切分任务我们可以这样切分:每次除2切分左右两个子任务,如100/2=50,1到50的相加由左子任务完成,51到100的由右子任务完成,1到50同理继续除2切分出子任务,切到只剩一个数的时候就返回。没错,我说的就是归并算法。核心逻辑代码实现如下。
MyForkTask leftTask = new MyForkTask(numbers);
leftTask.fork();
MyForkTask rightTask = new MyForkTask(numbers);
rightTask.fork();
count += leftNextPullTask.join();
count += rightNextPullTask.join();
对了,既然是线程池,那肯定是要提交任务。前面说了Fork-Join支持切分的任务分有返回值和没有返回值两种,任务是分别对应实现RecursiveTask接口与RecursiveAction接口。关于Fork-Join就说这么多吧。
Parallel Stream并行流需要注意的问题
一个是使用.parallelStream()之后,在接下来的管道中做任何业务逻辑都需要确保线程安全,比如:
List<Int..> result = new ArrayList<>();
tmpList.parallerStream()
.foEach(item -> {
result.add(item);
});
由于ArrayList并不是线程安全的,这样使用就会出现线程安全问题,所以注意了,使用parallerStream必须确保线程安全问题。
可能很多人都像我一样,自从用了stearm之后就很少写for循环了,这不是一个好的习惯。比如我只是简单的遍历一遍int数组,那就不要使用stearm,直接使用for循环性能会更好,因为stream你只是用着简单,但你看下源码,封装很多复杂逻辑,原本只是简单的数组遍历,你用stream之后将会创建出很多中间对象,还有一大堆的流式调用逻辑。
你以为这样就完了吗?还有更恐怖的线程安全问题。在并发量高的接口中不要直接使用stream的parallerStream处理耗时的逻辑,因为并行流运行时,内部使用的fork-join线程池是整个JVM进程全局唯一的线程池。而这个线程池其默认线程数为处理器核心数。
Runtime.getRuntime().availableProcessors()
可以通过配置修改这个值 :
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
一般不建议修改,最好是自己创建一个Fork-Join线程池来用,因为你如果改了,虽然对你当前的业务逻辑来说,算是调优了,但对于项目中其它地方只是用来做非耗时的并行流运算,性能就差了。
,由于所有使用并行流parallerStream的地方都是使用同一个Fork-Join线程池,而线程池线程数仅为cpu的核心数。我们可以来写个例子验证是不是整个java进程使用的parallerStream都是用的同一个进程,我这里提供例子,不相信的可以自己拿去跑下看看。
public class PStream {
public static void main(String[] args) throws InterruptedException {
final List<Integer> list = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
list.add(1);
}
for (int i = 1; i <= 50; i++) {
new Thread("test-" + i) {
String currentThreaName = this.getName();
@Override
public void run() {
list.parallelStream()
.forEach(numbser -> {
Thread c = Thread.currentThread();
System.out.println(currentThreaName + "===> "
+ c.getClass().getName() + ":" + c.getName() + ":" + c.getId());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}.start();
}
Thread.sleep(Integer.MAX_VALUE);
}
}
程序运行结果部分截图
你可能会被线程id62、63、64吓到,因为for循环创建了50个线程,jvm启动后自身也会创建一些线程,比如gc线程。所以全局Fork-Join线程池的线程id是从62开始的几个。
你还可以去调试看源码。
对应forEach流
ForEachOps::compute方法打个断点,
或者直接forEach方法的输出语句打个断点,找到ForkJoinWorkerThread类
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool;
final ForkJoinPool.WorkQueue workQueue;
public void run() {
pool.runWorker(workQueue);
......
}
}
假设,分布式服务中(rpx框架:dubbo),有一个接口,用于批量处理数据,如果每次消费者调用都用了批量处理1000条记录的过滤,假设一条记录的过滤逻辑需要耗时4ms( 涉及到redis缓存的读),如果有40个请求并发过滤,那就是40000条记录交给2个线程去处理(cpu核心线程数),你猜下结果是什么?结果是,服务消费端报错,一堆的接口调用超时异常,导致服务雪崩。后果很严重。原因你猜到了吗?
40个请求开启40个并行流parallerStream,40个并行流parallerStream使用同一个只有2个线程的Fork-Join线程池(2核8g机器),意味着40个请求争抢着执行任务。
假设一条记录的过滤耗时为4ms,在串行的情况下1000条记录应该只是4000ms。但如果是400000条记录争抢2个线程执行,我们转变一下,假设每线程每200000记录执行,由于是无序的,但可以想象对请求来说任务是被交替执行完成的。什么意思呢,比如当前执行1号请求的第一个任务,执行完后切换到2号个请求的第一个任务,接着3号请求的第一个任务,一轮完成后接着是1号请求的第二个任务…所以,最坏的情况下,一个请求需要200000*4ms才能执行完成。就会导致接口调用超时。
总之,不要在高并发的接口中使用并行流,直接使用处理请求的线程执行就行,如果有需要,那就全局创建一个Fork-Join线程池自己切分任务来执行。
刚刚说的例子只是40个并发,实现项目中都是上千上万的并发请求,如果这样使用并行流,服务直接崩掉。
假设用的dubbo默认配置200个工作线程,那么是200个线程处理业务逻辑快呢,还是将200个线程的请求都交给只有2个线程的线程池处理快呢?毫无疑问。
总结
那些耗时很长的任务,请不要使用parallerStream。假设原本一个任务执行需要1分钟时间,有10个任务并行执行,如果你偷懒,只是使用parallerStream来将这10个任务并行执行,那你这个jvm进程中,其它同样使用parallerStream的地方也会因此被阻塞住,严重的将会导致整个服务瘫痪。
关于stream的并行流parallerStream使用注意事项就说到这。切记,请不要乱用并行流,在使用之前一定、一定、一定要考虑清楚任务是否耗时,有i/o操作的一定不要使用并行流,有线程休眠的也一定不要使用并行流,原本就只有两个线程,还搞休眠,等着整个服务崩溃咯。
除此之外,我们可以自定义线程池,让Java8中的parallel stream 使我们的线程池,而不是使用公共的。这个技巧是基于ForkJoinTask.fork,它指定:"安排在当前任务运行的池中异步执行此任务(如果适用),或者如果不是inForkJoinPool()则使用ForkJoinPool.commonPool()"
使用案例如下:
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() +"" + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
具体的内容参考如下文章:
并行流与Fork-Join
Java8 parallelStream —— 替换默认的共享线程池ForkJoinPool.commonPool()
|