一.Spark Core中的checkpoint
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]")
.getOrCreate()
val sc = spark.sparkContext
sc.setCheckpointDir("checkpoint")
val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))
val pairs = sc.parallelize(data, 3)
pairs.cache()
pairs.checkpoint()
println(pairs.count)
}
二.Spark Streaming中的checkpoint
checkpoint主要保存:1.metadata(一些配置) 2.RDD数据(保存状态)
1.无状态
def main(args: Array[String]): Unit = {
val checkpointDirectory = "offset/checkpoints"
def functionToCreateContext(): StreamingContext = {
val sparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(2))
ssc.checkpoint(checkpointDirectory)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hadoop000:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "my-spark-group-1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("my-topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))
words.print()
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}
2.自定义有状态
用到updateStateByKey函数进行状态保存。
val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))
.updateStateByKey[Int](updateFunction _)
words.print()
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = newValues.sum
val old = runningCount.getOrElse(0)
Some(newCount+old)
}
checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。
- 元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
- 所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦。
UUID文件夹下就是保存状态的RDD数据。
3.窗口函数中的有状态和无状态
单纯的window函数是无状态的。 countByWindow函数是有状态的,需要checkpoint来保存状态。
三.Spark Structured Streaming中的checkpoint
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]")
.appName(this.getClass.getName)
.config("spark.sql.shuffle.partitions", 10)
.getOrCreate()
import spark.implicits._
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop000:9092")
.option("subscribe", "my-topic")
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.flatMap(_.split(" "))
.map(word => (word, 1))
.writeStream
.outputMode("append")
.format("console")
.option("checkpointLocation","sss/chk")
.start()
.awaitTermination()
}
内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。
offsets文件夹下每个文件都是一个batch的偏移量。
{"batchWatermarkMs":0,
"batchTimestampMs":1647251960388,
"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
"spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2",
"spark.sql.streaming.multipleWatermarkPolicy":"min",
"spark.sql.streaming.aggregation.stateFormatVersion":"2",
"spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}
稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。 但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。
sss中append模式改为complete模式则会报错。
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。
.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")
config(“spark.sql.shuffle.partitions”, 5)这个参数决定了state/0 下文件夹下的个数。 delta前的数字每个批次完成后加一。
四.参考文章
checkpoint保存了什么? Yarn上常驻Spark-Streaming程序调优 还在把offset存ZooKeeper吗?Structured Streaming checkpoint是这么用的
|