IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark Structured Streaming Kafka offset提交监控 -> 正文阅读

[大数据]Spark Structured Streaming Kafka offset提交监控

StreamingQueryListener
StreamingQueryListener,即监听StreamingQuery各种事件的接口,如下:
?

abstract class StreamingQueryListener {

? import StreamingQueryListener._

? // 查询开始时调用
? def onQueryStarted(event: QueryStartedEvent): Unit

? // 查询过程中状态发生更新时调用
? def onQueryProgress(event: QueryProgressEvent): Unit

? // 查询结束时调用
? def onQueryTerminated(event: QueryTerminatedEvent): Unit
}


在QueryProgressEvent中,我们是可以拿到每个Source消费的Offset的。因此,基于StreamingQueryListener,可以将消费的offset的提交到kafka集群,进而实现对Kafka Lag的监控。

基于StreamingQueryListener向Kafka提交Offset
监控Kafka Lag的关键是能够向Kafka集群提交消费的Offset,以下示例演示了如何通过StreamingQueryListener向Kafka提交Offset。

KafkaOffsetCommiter


package com.bigdata.structured.streaming.monitor

import java.util
import java.util.Properties

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition

import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.slf4j.LoggerFactory

/**
? * Author: Wang Pei
? * Summary:
? * ? 向Kafka集群提交Offset的Listener
? */


class KafkaOffsetCommiter(brokers: String, group: String) extends StreamingQueryListener {

? val logger = LoggerFactory.getLogger(this.getClass)

? // Kafka配置
? val properties= new Properties()
? properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
? properties.put(ConsumerConfig.GROUP_ID_CONFIG, group)
? properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
? properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
? val kafkaConsumer = new KafkaConsumer[String, String](properties)

? def onQueryStarted(event: QueryStartedEvent): Unit = {}

? def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}

? // 提交Offset
? def onQueryProgress(event: QueryProgressEvent): Unit = {

? ? // 遍历所有Source
? ? event.progress.sources.foreach(source=>{

? ? ? val objectMapper = new ObjectMapper()
? ? ? ? .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
? ? ? ? .configure(DeserializationFeature.USE_LONG_FOR_INTS, true)
? ? ? ? .registerModule(DefaultScalaModule)

? ? ? val endOffset = objectMapper.readValue(source.endOffset,classOf[Map[String, Map[String, Long]]])

? ? ? // 遍历Source中的每个Topic
? ? ? for((topic,topicEndOffset) <- endOffset){
? ? ? ? val topicPartitionsOffset = new util.HashMap[TopicPartition, OffsetAndMetadata]()

? ? ? ? //遍历Topic中的每个Partition
? ? ? ? for ((partition,offset) <- topicEndOffset) {
? ? ? ? ? val topicPartition = new TopicPartition(topic, partition.toInt)
? ? ? ? ? val offsetAndMetadata = new OffsetAndMetadata(offset)
? ? ? ? ? topicPartitionsOffset.put(topicPartition,offsetAndMetadata)
? ? ? ? }

? ? ? ? logger.warn(s"提交偏移量... Topic: $topic Group: $group Offset: $topicEndOffset")
? ? ? ? kafkaConsumer.commitSync(topicPartitionsOffset)
? ? ? }
? ? })
? }
}


Structured Streaming App


package com.bigdata.structured.streaming.monitor

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}

/**
? * Author: Wang Pei
? * Summary:
? * ? 读取Kafka数据
? */
object ReadKafkaApp {
? def main(args: Array[String]): Unit = {

? ? val kafkaBrokers="kafka01:9092,kafka02:9092,kafka03:9092"
? ? val kafkaGroup="read_kafka_c2"
? ? val kafkaTopics1="topic_1,test_2"
? ? val kafkaTopics2="test_3"
? ? val checkpointDir="/Users/wangpei/data/apps/read_kafka/checkpoint/"
? ? val queryName="read_kafka"

? ? val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()
? ? import spark.implicits._

? ? // 添加监听器
? ? val kafkaOffsetCommiter = new KafkaOffsetCommiter(kafkaBrokers,kafkaGroup)
? ? spark.streams.addListener(kafkaOffsetCommiter)

? ? // Kafka数据源1
? ? val inputTable1=spark
? ? ? .readStream
? ? ? .format("kafka")
? ? ? .option("kafka.bootstrap.servers",kafkaBrokers )
? ? ? .option("subscribe",kafkaTopics1)
? ? ? .load()
? ? ? .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
? ? ? .as[(String, String)]
? ? ? .select($"value")

? ? // Kafka数据源2
? ? val inputTable2=spark
? ? ? .readStream
? ? ? .format("kafka")
? ? ? .option("kafka.bootstrap.servers",kafkaBrokers )
? ? ? .option("subscribe",kafkaTopics2)
? ? ? .load()
? ? ? .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
? ? ? .as[(String, String)]
? ? ? .select($"value")

? ? // 结果表
? ? val resultTable = inputTable1.union(inputTable2)

? ? // 启动Query
? ? val query: StreamingQuery =resultTable
? ? ? .writeStream
? ? ? .format("console")
? ? ? .option("truncate","false")
? ? ? .outputMode("append")
? ? ? .trigger(Trigger.ProcessingTime("2 seconds"))
? ? ? .queryName(queryName)
? ? ? .option("checkpointLocation", checkpointDir)
? ? ? .start()

? ? spark.streams.awaitAnyTermination()

? }
}


查看Kafka Offset 可通过以下命令查看Topic消费者组对应的Offset。

bin/kafka-consumer-offset-checker.sh --zookeeper kafka01:2181 ?--topic test_3 --group read_kafka_c2
Group ? ? ? ? ? Topic ? ? ? ? ? ? ? ? ? ? ? ? ?Pid Offset ? ? ? ? ?logSize ? ? ? ? Lag ? ? ? ? ? ? Owner
read_kafka_c2 ? test_3 ? ? ? ? ? ? ? ? ? ? ? ? 0 ? 32 ? ? ? ? ? ? ?32 ? ? ? ? ? ? ?0 ? ? ? ? ? ? ? none
read_kafka_c2 ? test_3 ? ? ? ? ? ? ? ? ? ? ? ? 1 ? 32 ? ? ? ? ? ? ?32 ? ? ? ? ? ? ?0 ? ? ? ? ? ? ? none
read_kafka_c2 ? test_3 ? ? ? ? ? ? ? ? ? ? ? ? 2 ? 34 ? ? ? ? ? ? ?34 ? ? ? ? ? ? ?0 ? ? ? ? ? ? ? none
?


?
同理,可查看另外两个Topic对应的Group的Offset。

更多可参照项目:GitHub - HeartSaVioR/spark-sql-kafka-offset-committer: Kafka offset committer for structured streaming query

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-26 12:16:31  更:2021-10-26 12:17:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 3:39:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码