执行模式(批处理/流处理)
DataStream API 支持不同的运行时执行模式,您可以根据用例的要求和作业的特征从中进行选择。这是DataStream API的“经典”执行行为,我们称之为STRIMING 执行模式。这应该用于连续增量处理并希望无限期保持在线的无界作业。
此外,还有一种批处理式执行模式,我们称之为 BATCH 执行模式。可以让你联想到 MapReduce 等批处理框架的方式执行作业。这将用于具有已知固定输入并且不会连续运行的有界作业。
Apache Flink对流和批处理的统一方法意味着,不管配置的执行模式如何,在有界输入上执行的DataStream 应用程序都会产生相同的最终结果。重要的是要注意这里的final含义:以STREAMING 模式执行的作业可能会产生增量更新(想想数据库中的upserts),而BATCH 作业最终只会产生一个最终结果。如果计算正确,最终结果将是相同的,但实现方法可能不同。
通过启用BATCH ,我们允许flink应用额外的优化,只有当我们知道输入的是有界流,我们才可以做这样的优化,例如可以使用不同的join/aggregation 策略,在此基础上还可以拥有不同的shuffer 实现和更有效的任务调度和故障恢复行为。
什么时候可以/应该使用 BATCH 执行模式?
BATCH 执行模式只能用于有界的 Jobs/Flink 程序。有界性是数据源的一个属性,它告诉我们来自该数据源的所有输入在执行之前是否已知,或者是否会有新的数据出现(可能是无限期的)。反过来,如果一个job的所有源都是有界的,那么它就是有界的,否则就是无界的。
另一方面,STREAMING 执行模式即可用于有界也可以用于无界作业。
根据经验,当您的程序有界时,您应该使用BATCH 执行模式,因为这将更高效。当你的程序是无界的时候,你必须使用STREAMING 执行模式,因为只有这种模式才可以处理连续的数据流。
配置BATCH执行模式
执行模式可以通过 execution.runtime-mode 设置进行配置。存在三个可能的值:
STREAMING : 标准的 DataStream 执行模式(默认)BATCH : DataStream API上的批处理模式AUTOMATIC : 让系统根据源的有界性来决定
这可以通过 bin/flink run ... 的命令行参数进行配置,或者在创建/配置 StreamExecutionEnvironment 时以编程方式进行配置。
以下是通过命令行配置执行模式的方法:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
此示例显示如何在代码中配置执行模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
我们建议用户不要在他们的程序中设置运行时模式,而是在提交应用程序时使用命令行设置它。不在应用代码中配置将获得更大的灵活性,因为相同的应用程序可以在任何执行模式下执行。
Execution Behavior(执行行为)
本节概述了 BATCH 执行模式的执行行为,并将其与 STREAMING 执行模式进行了对比。详情请参考介绍此功能的 FLIP-134 and FLIP-140。
任务调度和网络shuffle
Flink作业由不同的操作组成,这些操作在数据流图中连接在一起。系统决定如何在不同的进程/机器(taskmanager )上调度这些操作的执行,以及数据如何在它们之间进行shuffle 。
可以使用称为chaining的功能将多个操作/操作符链接在一起。Flink将一个或多个(链式)操作符组成的组视为调度单元,称为Task 。通常,term subTask 是指在多个taskmanager 上并行运行的单个任务实例,但我们在这里只使用 term task 。
对于 BATCH 和 STREAMING 执行模式,任务调度和网络shuffle 的工作方式不同。主要是因为我们知道我们的输入数据在 BATCH 执行模式下是有界的,这允许 Flink 使用更高效的数据结构和算法。
我们将通过这个例子来解释任务调度和网络传输的区别:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.fromElements(...);
source.name("source")
.map(...).name("map1")
.map(...).name("map2")
.rebalance()
.map(...).name("map3")
.map(...).name("map4")
.keyBy((value) -> value)
.map(...).name("map5")
.map(...).name("map6")
.sinkTo(...).name("sink");
在操作之间隐含一对一连接模式的操作,如map()、flatMap() 或filter() ,可以直接将数据转发给下一个操作,因此允许将这些操作链接在一起。这意味着Flink通常不会在它们之间进行网络shuffle 。
另一方面,诸如 keyBy() 或 rebalance() 之类的操作需要在不同的并行任务实例之间打乱数据。这会导致网络shuffle 。
对于上面的例子,Flink会像以下这样将操作分组为任务:
- Task1: source, map1, and map2
- Task2: map3, map4
- Task3: map5, map6, and sink
我们在任务 1 和任务 2 以及任务 2 和 3 之间进行了网络shuffle 。以下是该Job的直观表示:
在STREAMING 执行模式下,所有任务都需要一直运行。这允许Flink通过整个管道立即处理新记录,我们需要连续和低延迟的流处理。这也意味着分配给作业的taskmanager 需要有足够的资源来同时运行所有任务。
网络shuffle 是流水线式的,这意味着记录会在网络层进行一些缓冲并立即发送到下游任务。
批执行模式
在批处理执行模式下,作业的任务可以被划分为几个阶段,这些阶段可以一个接一个地执行。我们可以这样做,因为输入是有界的,因此Flink可以在进入下一个阶段之前将当前阶段全部处理完毕。在上面的示例中,作业将有三个阶段,对应于三个任务,这三个任务由shuffle 分隔的。
与上面针对流模式所述的立即将记录发送到下游任务不同,分阶段处理需要Flink将任务的中间结果持久化到存储介质中,从而允许下游任务在上游任务已经离线后读取这些结果。这会增加处理的延迟,但会带来其他有趣的特性。首先,这允许Flink在发生故障时恢复到最新的可用结果,而不是重新启动整个作业。另一个好处用是批处理作业可以在更少的资源上执行(相对于TaskManager 的可用Slot 而言),因为系统可以依次执行任务。
只要下游任务没有消耗它们,TaskManagers 就会保留中间结果。 之后,只要空间允许,它们将被保留,以便在发生故障时回溯到更早的结果。
状态后端/状态
在 STREAMING 模式下,Flink 使用 StateBackend 来控制状态的存储方式以及检查点的工作方式。
在BATCH 模式下,配置的状态后端将会被忽略。相反,keyed 操作的输入key 分组(使用排序),然后我们依次处理key 对应的所有记录。这允许同时只保留一个key 的状态。当移动到下一个键key 时,上一个给定key 的状态将被丢弃。
有关这方面的相关信息,请参阅 FLIP-140 。
Order of Processing(处理顺序)
在操作符或用户定义函数(udf) 中处理记录的顺序在BATCH 和STREAMING 执行之间可能不同。
在STREAMING 模式下,用户定义的函数不应对传入记录的顺序进行任何假设。数据一到达就被处理。
BATCH 执行模式,有一些操作可以使记录在flink中保证序的。
我们可以区分三种一般类型的输入:
- **
broadcast input(广播输入) **通过广播流输入 (请查看 Broadcast State) regular input(常规输入) :既不是广播也不是键控的输入keyed inpu(键控输入) :来自 KeyedStream 的输入
使用多个输入类型的函数或操作符将按照以下顺序处理它们:
- 首先处理广播输入
- 第二个处理常规输入
- 最后处理键控输入
对于使用多个常规或广播输入的函数(如CoProcessFunction 函数),Flink有权以任何顺序处理该类型输入的数据。
对于使用多个keyed 输入的函数(如KeyedCoProcessFunction ),Flink会处理所有keyed 输入中单个key 的所有记录,然后再转到下一个key 控输入。
Event Time / Watermarks(时间时间 / 水印)
基于 event time的flink的流式程序,在流运行时Flink对事件产生的顺序做一个悲观的假设,即时间戳为 t 的事件可能出现在时间戳为 t+1 的事件之后。因为系统永远无法确定小于给定的时间戳 T 的元素是否会再次出现。为了抵消这种无序性对最终结果的影响,同时是系统更加实用,在STRIMING 模式下flink使用一种名为Watermarks 的机制。
在BATCH 模式中,输入数据集是预先知道的,因此不需要这样的机制,至少,元素可以按时间戳排序,以便按照时间顺序处理它们。
Processing Time(处理时间)
Processing Time 是处理记录的机器上的挂钟时间。根据这个定义,我们可以看到基于Processing Time 的计算结果是不重复的。这是因为处理两次的相同记录将有两个不同的时间戳。
尽管如此,在STREAMING模式下使用处理时间还是很有用的。原因在于,流管道经常实时地接收它们的无界输入,因此事件时间和处理时间之间存在相关性。此外,由于上述原因,在STREAMING模式下,事件时间的1h往往可以接近处理时间或挂钟时间的1h。因此,使用处理时间可以用于提前(不完全)触发计算,从而给出预期结果的提示。
这种相关性在输入数据集是静态且事先已知的批处理模式中不存在。鉴于此,在 BATCH 模式下,允许用户请求当前处理时间并注册处理时间计时器,但是,在event-time 的情况下,所有计时器都将在输入结束时触发。
故障恢复
在STREAMING 执行模式下,Flink使用检查点进行故障恢复。查看checkpointing documentation 文档,了解有关此操作的文档以及如何配置它。还有一个关于通过状态快照进行容错的介绍部分,这将是从更高的层次来解释这些概念。
故障恢复检查点的特征之一是,在出现故障时,Flink将从检查点重新启动所有正在运行的任务。这可能比我们在BATCH 模式下所做的工作成本更高(如下所述),这也是如果您的工作允许的话,您应该使用BATCH 执行模式的原因之一。
在BATCH 执行模式下,Flink将尝试回溯到中间结果仍然可用的前一个处理阶段。与从检查点重新启动所有任务相比,可能只有失败的任务(或图中它们的前身)必须重新启动,这可以提高作业的处理效率和总体处理时间。
重要注意事项
与传统的STREAMING 式执行模式相比,在BATCH 模式中,有些东西可能无法按预期工作。有些特性的工作方式略有不同,而其他特性则不受支持。
BATCH 模式下的行为变化:
- “滚动”操作,如 reduce() 或 sum() 操作, STREAMING 模式下flink会为到达的新记录发出增量更新。在
BATCH 模式下,这些操作不是“滚动”的。他们只发出最终结果。
在 BATCH 模式下不支持:
检查点
如上所述,BATCH 处理程序的故障恢复不使用检查点。
重要的是要记住,因为没有检查点,某些功能(例如 CheckpointListener 以及因此 Kafka 的 EXACTLY_ONCE 模式或 StreamingFileSink 的 OnCheckpointRollingPolicy 将不起作用)。如果您需要在批处理模式下使用事务DataSink,请确保使用FLIP-143中建议的统一DataSink API。
|