Flink是标准的实时处理引擎,而且Spark的两个模块Spark Streaming和Structured Streaming都是基于微批处理的,不过现在Spark Streaming已经非常稳定基本都没有更新了,然后重点移到spark sql和structured Streaming了。
维表join和异步IO
Structured Streaming不直接支持与维表的join操作,但是可以使用map、flatmap及udf等来实现该功能,所有的这些都是同步算子,不支持异步IO操作。但是Structured Streaming直接与静态数据集的join,可以也可以帮助实现维表的join功能,当然维表要不可变。
Flink支持与维表进行join操作,除了map,flatmap这些算子之外,flink还有异步IO算子,可以用来实现维表,提升性能。
状态管理
状态维护应该是流处理非常核心的概念了,比如join,分组,聚合等操作都需要维护历史状态。那么flink在这方面很好,structured Streaming也是可以,但是spark Streaming就比较弱了,只有个别状态维护算子upstatebykye等,大部分状态需要用户自己维护,虽然这个对用户来说有更大的可操作性和可以更精细控制但是带来了编程的麻烦。flink和Structured Streaming都支持自己完成了join及聚合的状态维护。
Structured Streaming有高级的算子,用户可以完成自定义的mapGroupsWithState和flatMapGroupsWithState,可以理解类似Spark Streaming 的upstatebykey等状态算子。
就拿mapGroupsWithState为例:
由于Flink与Structured Streaming的架构的不同,task是常驻运行的,flink不需要状态算子,只需要状态类型的数据结构。
首先看一下Keyed State下,我们可以用哪些原子状态:
-
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。 -
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。 -
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用 -
reduceFunction,最后合并到一个单一的状态值。 -
FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。 -
MapState:即状态值为一个map。用户通过put或putAll方法添加元素。
Join操作
Flink的join操作
flink的join操作没有大的限制,支持种类丰富,比如:
Inner Equi-join
SELECT?*?FROM?Orders?INNER?JOIN?Product?ONOrders.productId?=?Product.id
Outer Equi-join
SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId =Product.id
SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId =Product.id
SELECT *
FROM Orders FULL OUTER JOIN Product ONOrders.productId = Product.id
Time-windowed Join
SELECT?*?FROM?Oderso,Shipmentss?WHEREo.id=s.orderIdAND?o.ordertimeBETWEENs.shiptime?INTERVAL'4'HOURANDs.shiptime
Expanding arrays into a relation
SELECT users, tag FROM?Orders?CROSS?JOIN?UNNEST(tags)?AS?t?(tag)
Join with Table Function
Inner Join
A row of the left (outer) table is dropped, if its table function call returns an empty result. SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag
Left Outer Join If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.
SELECT users, tag FROM?Orders?LEFT?JOIN?LATERAL?TABLE(unnest_udtf(tags))?t?AS?tag?ON?TRUE
Join with Temporal Table
SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE ?r_currency?=?o_currency
Structured Streaming的join操作
Structured Streaming的join限制颇多了,限于篇幅问题在这里只讲一下join的限制。具体如下表格:
容错机制及一致性语义
本节内容主要是想对比两者在故障恢复及如何保证仅一次的处理语义。这个时候适合抛出一个问题:实时处理的时候,如何保证数据仅一次处理语义?
Spark Streaming 保证仅一次处理
对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰一次处理语义。
对于 Spark Streaming 与 kafka 结合的 direct Stream 可以自己维护 offset 到 zookeeper、kafka 或任何其它外部系统,每次提交完结果之后再提交 offset,这样故障恢复重启可以利用上次提交的 offset 恢复,保证数据不丢失。但是假如故障发生在提交结果之后、提交 offset 之前会导致数据多次处理,这个时候我们需要保证处理结果多次输出不影响正常的业务。
由此可以分析,假设要保证数据恰一次处理语义,那么结果输出和 offset 提交必须在一个事务内完成。在这里有以下两种做法:
repartition(1) Spark Streaming 输出的 action 变成仅一个 partition,这样可以利用事务去做:
Dstream.foreachRDD(rdd=>{ rdd.repartition(1).foreachPartition(partition=>{ // 开启事务 partition.foreach(each=>{// 提交数据 }) // 提交事务 }) })
?
也就是结果数据包含 offset。这样提交结果和提交 offset 就是一个操作完成,不会数据丢失,也不会重复处理。故障恢复的时候可以利用上次提交结果带的 offset。
Flink 与 kafka 0.11 保证仅一次处理
若要 sink 支持仅一次语义,必须以事务的方式写数据到 Kafka,这样当提交事务时两次 checkpoint 间的所有写入操作作为一个事务被提交。这确保了出现故障或崩溃时这些写入操作能够被回滚。
在一个分布式且含有多个并发执行 sink 的应用中,仅仅执行单次提交或回滚是不够的,因为所有组件都必须对这些提交或回滚达成共识,这样才能保证得到一致性的结果。Flink 使用两阶段提交协议以及预提交(pre-commit)阶段来解决这个问题。
本例中的 Flink 应用如图 11 所示包含以下组件:
下面详细讲解 flink 的两段提交思路:
如图 12 所示,Flink checkpointing 开始时便进入到 pre-commit 阶段。具体来说,一旦 checkpoint 开始,Flink 的 JobManager 向输入流中写入一个 checkpoint barrier ,将流中所有消息分割成属于本次 checkpoint 的消息以及属于下次 checkpoint 的,barrier 也会在操作算子间流转。对于每个 operator 来说,该 barrier 会触发 operator 状态后端为该 operator 状态打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 传递到后续的 operator。
这种方式仅适用于 operator 仅有它的内部状态。内部状态是指 Flink state backends 保存和管理的内容(如第二个 operator 中 window 聚合算出来的 sum)。
当一个进程仅有它的内部状态的时候,除了在 checkpoint 之前将需要将数据更改写入到 state backend,不需要在预提交阶段做其他的动作。在 checkpoint 成功的时候,Flink 会正确的提交这些写入,在 checkpoint 失败的时候会终止提交,过程可见图 13。
当结合外部系统的时候,外部系统必须要支持可与两阶段提交协议捆绑使用的事务。显然本例中的 sink 由于引入了 kafka sink,因此在预提交阶段 data sink 必须预提交外部事务。如下图:
当 barrier 在所有的算子中传递一遍,并且触发的快照写入完成,预提交阶段完成。所有的触发状态快照都被视为 checkpoint 的一部分,也可以说 checkpoint 是整个应用程序的状态快照,包括预提交外部状态。出现故障可以从 checkpoint 恢复。下一步就是通知所有的操作算子 checkpoint 成功。该阶段 jobmanager 会为每个 operator 发起 checkpoint 已完成的回调逻辑。
本例中 data source 和窗口操作无外部状态,因此该阶段,这两个算子无需执行任何逻辑,但是 data sink 是有外部状态的,因此,此时我们必须提交外部事务,如下图:
以上就是 flink 实现恰一次处理的基本逻辑。
背压
消费者消费的速度低于生产者生产的速度,为了使应用正常,消费者会反馈给生产者来调节生产者生产的速度,以使得消费者需要多少,生产者生产多少。
Spark Streaming 的背压
Spark Streaming 跟 kafka 结合是存在背压机制的,目标是根据当前 job 的处理情况来调节后续批次的获取 kafka 消息的条数。为了达到这个目的,Spark Streaming 在原有的架构上加入了一个 RateController,利用的算法是 PID,需要的反馈数据是任务处理的结束时间、调度时间、处理时间、消息条数,这些数据是通过 SparkListener 体系获得,然后通过 PIDRateEsimator 的 compute 计算得到一个速率,进而可以计算得到一个 offset,然后跟限速设置最大消费条数比较得到一个最终要消费的消息最大 offset。
PIDRateEsimator 的 compute 方法如下:
def compute( time: Long, // in milliseconds numElements: Long, processingDelay: Long, // in milliseconds schedulingDelay: Long // in milliseconds ): Option[Double] = { logTrace(s"\ntime = $time, # records = $numElements, " + s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000 val processingRate = numElements.toDouble / processingDelay * 1000 val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2) val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error - integral * historicalError - derivative * dError).max(minRate) logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin) latestTime = time if (firstRun) { latestRate = processingRate latestError = 0D firstRun = false logTrace("First run, rate estimation skipped") None } else { latestRate = newRate latestError = error logTrace(s"New rate = $newRate") Some(newRate) } } else { logTrace("Rate estimation skipped") None } } }
Flink 的背压
与 Spark Streaming 的背压不同的是,Flink 背压是 jobmanager 针对每一个 task 每 50ms 触发 100 次 Thread.getStackTrace() 调用,求出阻塞的占比。过程如图 16 所示:
阻塞占比在 web 上划分了三个等级:
OK: 0 <= Ratio <= 0.10,表示状态良好;
LOW: 0.10 < Ratio <= 0.5,表示有待观察;
HIGH: 0.5 < Ratio <= 1,表示要处理了。
表管理
flink和structured streaming都可以讲流注册成一张表,然后使用sql进行分析,不过两者之间区别还是有些的。
Structured Streaming将流注册成临时表,然后用sql进行查询,操作也是很简单跟静态的dataset/dataframe一样。
df.createOrReplaceTempView("updates") spark.sql("select?count(*)?from?updates")
其实,此处回想Spark Streaming 如何注册临时表呢?在foreachRDD里,讲rdd转换为dataset/dataframe,然后将其注册成临时表,该临时表特点是代表当前批次的数据,而不是全量数据。Structured Streaming注册的临时表就是流表,针对整个实时流的。Sparksession.sql执行结束后,返回的是一个流dataset/dataframe,当然这个很像spark sql的sql文本执行,所以为了区别一个dataframe/dataset是否是流式数据,可以df.isStreaming来判断。
当然,flink也支持直接注册流表,然后写sql分析,sql文本在flink中使用有两种形式:
1). tableEnv.sqlQuery("SELECT product,amount FROM Orders WHERE product LIKE '%Rubber%'")
2). tableEnv.sqlUpdate( "INSERT?INTO?RubberOrders?SELECT?product,?amount?FROM?Orders?WHEREproduct?LIKE?'%Rubber%'");
对于第一种形式,sqlQuery执行结束之后会返回一张表也即是Table对象,然后可以进行后续操作或者直接输出,如:result.writeAsCsv("");。 而sqlUpdate是直接将结果输出到了tablesink,所以要首先注册tablesink,方式如下:
TableSink csvSink = newCsvTableSink("/path/to/file", ...);
String[] fieldNames = {"product","amount"};
TypeInformation[] fieldTypes ={Types.STRING, Types.INT};
tableEnv.registerTableSink("RubberOrders",fieldNames, fieldTypes, csvSink);
flink注册表的形式比较多,直接用数据源注册表,如:
tableEnv.registerExternalCatalog(); tableEnv.registerTableSource();
也可以从datastream转换成表,如:
tableEnv.registerDataStream("Orders",ds,?"user,?product,?amount"); Table?table?=?tableEnv.fromDataStream(ds,"user,?product,?amount");
?
|