并行度概念
并行度可以认为同时处理数据的子任务数,在大数据场景下,我们都是依赖分布式框架做并行计算,从而提高数据的吞吐量。Flink中实现任务并行的方法就是将一个算子操作复制到多个节点(或者线程),当数据到来时,就可以到其中任何一个节点上执行。像这样将一个任务拆分到多个并行的子任务,分发到不同节点,就真正实现了并行计算。 包含并行子任务的数据流就是并行数据流,它需要多个分区来分配并行任务。一般情况下,一个流程序的并行度=所有算子中最大的并行度。一个程序中,不同的算子可能会有不同的并行度。就比如下图中,出了sink算子,其他算子的并行度都是2,所以该数据流的并行度就是2.
并行度设置
代码中设置
设置当前算子的并行度
我们在代码中,可以很简单地在算子后跟着调用 setParallelism()方法,来设置当前算子的并行度。这种方式设置的并行度,只针对当前算子有效。
stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);
设置当前环境的并行度
这样代码中所有算子,默认的并行度就都为 2 了。我们一般不会在程序中设置全局并行度, 因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
提交应用时设置
在使用 flink run 命令提交应用时,可以增加-p 参数来指定当前应用程序执行的并行度, 它的作用类似于执行环境的全局设置。如果我们直接在Web UI 上提交作业,也可以在对应输入框中直接添加并行度。
bin/flink run –p 2 –c 全类名 jar包
配置文件中设置
我们还可以直接在集群的配置文件 flink-conf.yaml 中直接更改默认并行度: parallelism.default: 2 这个设置对于整个集群上提交的所有作业有效,初始值为 1。无论在代码中设置、还是提交时的-p 参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的 CPU 核心数。这也就解释了为什么我们在第二章运行 WordCount 流处理程序时,会看到结果前有 1~4 的分区编号——运行程序的电脑是 4 核CPU,那么开发环境默认的并行度就是 4。
并行度优先级
(1) 对于一个算子,首先看在代码中是否单独指定了它的并行度,这个特定的设置优先 级最高,会覆盖后面所有的设置。 (2) 如果没有单独设置,那么采用当前代码中执行环境全局设置的并行度。 (3) 如果代码中完全没有设置,那么采用提交时-p 参数指定的并行度。 (4) 如果提交时也未指定-p 参数,那么采用集群配置文件中的默认并行度。 实际工作中,我们是在代码中只针对算子设置并行度,不设置全局并行度,这样方便我们提交作业时进行动态扩容。
|