IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark中的checkpoint机制 -> 正文阅读

[大数据]Spark中的checkpoint机制

一.Spark Core中的checkpoint

def main(args: Array[String]) {
  val spark = SparkSession.builder().appName("Checkpoint Test").master("local[2]")
    .getOrCreate()

  val sc = spark.sparkContext
  sc.setCheckpointDir("checkpoint")

  val data = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'))

  val pairs = sc.parallelize(data, 3)
  pairs.cache()
  pairs.checkpoint()
  println(pairs.count)
}

请添加图片描述

二.Spark Streaming中的checkpoint

checkpoint主要保存:1.metadata(一些配置) 2.RDD数据(保存状态)

1.无状态

def main(args: Array[String]): Unit = {

  val checkpointDirectory = "offset/checkpoints"

  def functionToCreateContext(): StreamingContext = {
    val sparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[2]")

    val ssc = new StreamingContext(sparkConf,Seconds(2))  
    ssc.checkpoint(checkpointDirectory) 

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hadoop000:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "my-spark-group-1",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("my-topic")

    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    
    val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))
    words.print()
    
    ssc
  }

  val ssc = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

  ssc.start()
  ssc.awaitTermination()
}

请添加图片描述

2.自定义有状态

用到updateStateByKey函数进行状态保存。

val words = stream.flatMap(_.value().split(" ")).map(word => (word, 1))
  .updateStateByKey[Int](updateFunction _)  
words.print()


def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
  val newCount = newValues.sum
  val old = runningCount.getOrElse(0)
  Some(newCount+old)
}

请添加图片描述
checkpoint-time这些文件就是元数据,保存ssc的配置和状态。 如果driver挂掉,yarn会自动重启AM,则通过StreamingContext.getOrCreate来重新获得ssc,不用getOrCreate,则每次重启都会重新new一个ssc对象,会丢掉过去信息。

  • 元数据保存当前应用的信息,如果更改应用,会出问题,比如批次间隔以前是10s,现在改为2s,跑的时候依然是10s。
  • 所以用来保存offset的话,程序还是会用以前的ssc的配置和状态,但是更改应用后,处理起来会很麻烦

UUID文件夹下就是保存状态的RDD数据。

3.窗口函数中的有状态和无状态

单纯的window函数是无状态的。
countByWindow函数是有状态的,需要checkpoint来保存状态。

三.Spark Structured Streaming中的checkpoint

def main(args: Array[String]): Unit = {
  val spark = SparkSession.builder().master("local[2]")
    .appName(this.getClass.getName)
    .config("spark.sql.shuffle.partitions", 10)
    .getOrCreate()

  import spark.implicits._

  val lines = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "hadoop000:9092")
    .option("subscribe", "my-topic")
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]
    .flatMap(_.split(" "))
    .map(word => (word, 1))
    //.groupBy("value")
    //.count()
    .writeStream
    .outputMode("append")
    .format("console")
    .option("checkpointLocation","sss/chk")
    .start()
    .awaitTermination()
}

请添加图片描述内置batchID,重启前batchID为5,重新启动后,第一个batchID为6,batchID是全局唯一的。

offsets文件夹下每个文件都是一个batch的偏移量。

{"batchWatermarkMs":0,
 "batchTimestampMs":1647251960388,
 "conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider",
         "spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2",
         "spark.sql.streaming.multipleWatermarkPolicy":"min",
         "spark.sql.streaming.aggregation.stateFormatVersion":"2",
         "spark.sql.shuffle.partitions":"10"}}
{"my-topic":{"0":81}}

稍微更改下业务逻辑,flatMap(.split(" ")) 改为 flatMap(.split(",")) 依然能继续消费,ss也能。
但是ss取消map(word => (word, 1))操作,会报错,sss不会。因为ss的checkpoint中写死了操作集的元数据信息。

sss中append模式改为complete模式则会报错。

.groupBy("value")
.count()
.writeStream
.outputMode("complete")

使用聚合操作,会产生state文件夹,这是内存中的状态持久化到容错存储里。

.flatMap(_.split(","))
.groupBy("value")
.count()
.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation","sss/chk")

请添加图片描述config(“spark.sql.shuffle.partitions”, 5)这个参数决定了state/0 下文件夹下的个数。
delta前的数字每个批次完成后加一。

四.参考文章


checkpoint保存了什么?
Yarn上常驻Spark-Streaming程序调优
还在把offset存ZooKeeper吗?Structured Streaming checkpoint是这么用的

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 22:37:16  更:2022-03-15 22:37:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:53:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码