上一篇文章对stream流的串行编程进行了详细的介绍,通过stream流可轻松搞定复杂的数据处理操作,但是,如果数据量很大时,中间的部分操作其实可以通过多线程的并行操作,从而提高效率。
上一篇:java8新特性——轻松玩转stream流的数据操作(串行) java8新特性专栏:
0. 引言
- 并行(Parallelism):强调的是将一个大任务分解为多个小任务后,为减少IO等待时间等,通过轮询执行这些小任务,得到多个中间结果后再汇总为一个最终结果。
- 并发(Concurrency):强调的是在同一时间开始执行多个任务,通常会涉及多线程之间的上下文切换
但在多CPU和分布式的时代,并发和并行的概念联系越来越紧密。至少在Java的Stream中,我们可以将并发和并行理解为同一个意思:基于多线程技术,对一个大任务分拆为多个小任务,分配到不同的线程中执行,从而提高效率,得到多个中间结果后再汇总为一个最终结果。
Stream的并行编程,底层是基于 ForkJoinPool 技术来实现的。ForkJoinPool是Java 7引入的用于并行执行的任务框架,核心思想是将一个大任务拆分成多个小任务(即fork),然后再将多个小任务的处理结果汇总到一个结果上(即join)。但是java7中的Fork-Join模式编程过程相对复杂,java8将这种模式应用于stream流就大大简化了开发代码。
1.Fork-Join模式
1.1 Fork/Join 框架
将一个大任务, 进行拆分(fork)成若干个小任务(拆到不可再拆时),再通过递归的方式,将一个个的小任务运算的结果进行 join 汇总.
采用 “工作窃取” 模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行, 并将小任务加到线程队列中, 然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。减少子任务等待时间,从而提高整体的效率。
1.2 Fork-Join & Map-Reduce 区别
Fork-Join 与大数据中的Map-Reduce思想类似,但是也有一定区别:
- 应用场景不同:
- F-J似乎设计为在单个Java VM上工作,
- M-R则明确设计为在大型机器集群上工作
- 划分子任务方式不同:
- F-J提供将任务分割成多个子任务的设施,以递归的方式;更多层次,在这个阶段“跨叉”通信的可能性
- M-R只做一个大的分裂,映射分裂在彼此之间不说话,然后一起减少一切。单层,没有分裂之间的通信,直到减少,并可大规模扩展。
1.3 程序举例
这里通过一个累加程序来测试,以1-300亿举例
- RecursiveTask : 有返回值
- RecursiveAction :无返回值
public class ForkJoinCalculate extends RecursiveTask<Long> {
private static final long serialVersionUID = 12345678925L;
private long start;
private long end;
private static final long THRESHOLD = 10000L;
public ForkJoinCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
if (length <= THRESHOLD) {
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
} else {
long mid = (start + end) / 2;
ForkJoinCalculate left = new ForkJoinCalculate(start, mid);
left.fork();
ForkJoinCalculate right = new ForkJoinCalculate(mid+1, end);
right.fork();
return left.join() + right.join();
}
}
}
三种提交任务的区别:
- execute(ForkJoinTask) 异步执行tasks,无返回值
- invoke(ForkJoinTask) 有Join, tasks会被同步到主进程,任务执行完毕后才回到主线程
- submit(ForkJoinTask) 异步执行,且带Task返回值,可通过task.get 实现同步到主线程
@Test
public void test15() {
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinCalculate(1, 30000000000L);
Long invoke = pool.invoke(task);
Instant end = Instant.now();
System.out.println(Duration.between(start,end).toMillis());
}
@Test
public void test16() {
Instant start = Instant.now();
long num = 30000000000L;
long sum = 0L;
for (long i = 1; i <= num; i++) {
sum += i;
}
Instant end = Instant.now();
System.out.println(Duration.between(start,end).toMillis());
}
2. 并行流的创建
- 通过 Collection.parallelStream 方法可以得到一个并行流
- 串行&并行之间的转换,在中间操作中可随意转换
- BaseStream.parallel() 串 -> 并
- BaseStream.sequential() 并 -> 串
- BaseStream.isParallel() 可以判断一个 stream 是否是并行流。
Instant start = Instant.now();
OptionalLong reduce = LongStream.rangeClosed(0, 30000000000L).parallel().reduce(Long::sum);
System.out.println(reduce.getAsLong());
Instant end = Instant.now();
System.out.println(Duration.between(start,end).toMillis());
3. 顺序性
- 有序:List 和 Array生成的stream都是ordered stream,而使用 BaseStream.unordered() 方法则可以解除顺序限制,变为 unordered stream。
- 无序:HashSet 生成的stream是unordered stream,可以通过排序方法 sort() 强行给stream添加一个 encounter order的约束,变为 ordered stream。
注意:
- unordered并不会打乱顺序,只是解除限制,不再保证顺序,然后某些操作可以做一些特殊优化
- 以最常见的Stream.forEach 为例,在并行执行的时候,即使数据源是List,forEach方法处理元素的顺序也是无序的。要保证处理顺序,需要使用方法 Stream.forEachOrdered。
4.线程安全
4.1 纯函数
纯函数(purely function):函数调用不会改变函数以外的其它状态,换而言之,即函数调用不会改变在该函数之外定义的变量值。
要保证数据安全,就要求流操作中要全部使用纯函数操作。
以下在并行时,就不能保证数据安全
ArrayList<String> results = new ArrayList<>();
provinces.parallelStream()
.filter(s -> !s.startsWith("G"))
.forEach(s -> results.add(s));
应该为以下形式:
List<String> provinces = Arrays.asList("Guangdong", "Jiangsu", "Guangxi", "Jiangxi", "Shandong");
List<String> results = provinces.parallelStream()
// 过滤掉以 G 开头的省份
.filter(s -> !s.startsWith("G"))
// 没有 "副作用"
.collect(Collectors.toList());
4.2 规约操作(reduce)
T reduce(T identity, BinaryOperator<T> accumulator);
- identity:是规约操作的初始值。对任意值 t,要满足 accumulator.apply(identity, t) == t 。否则,会导致错误的结果。
- accumulator: 二元表达式,要求满足结合律(associative),否则在一些顺序不确定的或并行的场景中会导致不正确的结果。这是因为并行的时候可能会有多个元素和初始值进行运算操作。
4.3 收集器(collect)
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
- R:回值的类型,通常是一个容器类(例如 Collection 或 Map)。
- T 是Stream中的元素类型。
- supplier:是用来创建一个容器实例的函数。
- accumulator:将Stream中的一个元素合并到容器中的函数。
- combiner 是将两个容器归并为一个容器的函数,只在并行执行的时候用到。
在并行执行的场景下,我们有一些额外的要求:
- combiner函数满足结合律
- 要求combiner 和 accumulator 是兼容的(compatible),即对于任意的r和t, combiner.accept(r, accumulator.accept(supplier.get(), t)) == accumulator.accept(r, t)
|