Flink 的状态一致性
什么是状态一致性
有状态的流处理,每个算子任务都可以有自己的状态。所谓的状态一致性, 其实就是我们所说的计算结果要保证准确。一条数据不应该被丢失,也不应该被 重复计算。在遇到故障时可以恢复状态,恢复以后得重新计算,结果应该也是完 全正确的。
状态一致性的分类
At-Most-Once(最多一次): 当任务故障时,最简单的做法就是什么都不干,既不恢复丢失的数据,也不 重复数据。最多处理一次事件。数据可能会丢失。但是处理的速度快。 At-Least-Once(至少一次) : 在大多数的真实应用场景,我们不希望数据丢失。所有的事件都会被处理, 而且可以被多次处理。 Exactly-Once(精确一次) : 恰好保证每个事件只被处理了一次,既没有数据丢失,也没有数据重复处理 的情况出现
端到端的状态一致性
Flink 使用轻量级快照机制—checkpoint 来保证 exactly-once 语义。 目前我们看到的一致性保证都是在 Flink 流处理器内部保证的,而在真实的 开发中,流处理除了流处理器以外,还包含了数据源(如 kafka)和输出到持久 化系统。端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终, 每一个组件都保证了它自己的一致性。
端到端的 Exactly-Once
内部保证:checkpoint Source 端:可重置数据的读取位置,比如 kafka 的偏移量可以手动维护,提 交。 Sink 端:从故障恢复时,数据不会重复写入外部系统。(幂等写入、事务写 入)
注: 幂等写入:就是说一个操作,可以重复执行很多次,但只导致一次结果更改, 后面再重复执行就不起作用了。 事务写入:原子性,一个事务中的一系列操作,要么全部成功,要么一个不 做。 实现的思想,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时 候,才把所有对应的结果写入 Sink 系统中。 实现方式,预写日志(GenericWriteAheadSink)和两阶段提交(TwoPhaseCommitSinkFunction)。
案例:
Flink 与 Kafka 端到端的 Exactly-Once: Flink 内部:利用 checkpoint 机制,把状态存盘,发生故障时可以恢复,保证 内部的状态一致性。 Source:KafkaConsumer 作为 Source,可以将偏移量作为状态保存下来,如果后续任务发现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据, 保证一致性。 Sink : KafkaProducer 作 为 Sink , 采 用 两 阶 段 提 交 Sink , 需 要 实 现 TwoPhaseCommitSinkFunction。
package cn.jixiang.checkpoint
import java.lang
import java.util.{Properties, Random}
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.time.Time
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend
import org.apache.flink.streaming.api._
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
object End2EndExactlyOnce {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC)
val hashMapStateBackend = new HashMapStateBackend()
env.setStateBackend(new HashMapStateBackend())
env.getCheckpointConfig.setCheckpointStorage("file:///D:\\Note\\Projects\\02\\Flink\\cha01\\ckp")
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
env.getCheckpointConfig.setCheckpointTimeout(60000)
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(10)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.milliseconds(600)))
val props1 = new Properties()
props1.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092")
props1.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1")
props1.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000")
props1.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
props1.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val kafkaSource = new FlinkKafkaConsumer[String]("test1",new SimpleStringSchema(),props1)
kafkaSource.setCommitOffsetsOnCheckpoints(true)
val inputData: DataStream[String] = env.addSource(kafkaSource)
val result = inputData
.flatMap(_.split(" "))
.map(t => {
val random = new Random()
val num = random.nextInt(5)
if (num == 2){
println(num)
throw new Exception("哎呀呀,是异常呀")
}
(t,1)
})
.keyBy(_._1)
.sum(1)
.map(t => t._1 + ":" + t._2)
val props2 = new Properties()
props2.setProperty("bootstrap.servers", "master:9092")
props2.setProperty("transaction.timeout.ms",1000*60*5+"")
val myProducer = new FlinkKafkaProducer[String]("test2",
new KafkaSerializationSchema[String]() {
override def serialize(element: String, timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
new ProducerRecord[Array[Byte], Array[Byte]]("test2",element.getBytes,element.getBytes("utf-8"))
}
},
props2,
Semantic.EXACTLY_ONCE
)
result.print()
result.addSink(myProducer)
env.execute("Flink + Kafka")
}
}
|