IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> kafka保存偏移量到redis启动时根据偏移量进行修正 -> 正文阅读

[大数据]kafka保存偏移量到redis启动时根据偏移量进行修正

一、测试工具下载

1.1 下载zookeeper

https://zookeeper.apache.org/releases.html#download
测试我用的是 window版本zookeeper-3.6.3
在这里插入图片描述

1.2 生成并修改zoo.cfg

在zookeeper的conf目录下复制一份zoo_sample.cfg文件,并重命名为zoo.cfg

在这里插入图片描述
修改zoo.cfg文件里面的路径(dataDir,dataLogDir为新建目录) 配置以下参数

# 存放内存数据库快照的目录
dataDir=D:zookeeper\\apache-zookeeper-3.6.3-bin\\data
# 存放事务日志目录
dataLogDir=D:zookeeper\\apache-zookeeper-3.6.3-bin\\logs
# AdminServer端口
admin.serverPort=7070
#zookeeper新版本启动的过程中,zookeeper新增的审核日志是默认关闭,所以控制台输出ZooKeeper audit is disabled,标准的修改方式应该是在zookeeper的配置文件zoo.cfg新增一行audit.enable=true即可
audit.enable=true
#客户端口
clientPort=2181

1.3 启动服务

进入bin目录下,双击zkServer.cmd
在这里插入图片描述

1.4 验证是否安装成功

在bin目录下双击zkCli.cmd,打开客户端(此时的服务端zkServer的dos窗口不要关闭),出现"欢迎"字样,说明安装成功!
在这里插入图片描述

2.1 安装kafka

kafka官网下载地址: http://kafka.apache.org/downloads.html

我下载的kafka_2.12-3.1.0.tgz,并解压到D:\kafka目录下(解压的目录不能过深,启动脚本不支持很长的路径)
在这里插入图片描述

2.2启动kafka

  • 编辑文件Kafka配置文件, D:\kafka_2.13-2.8.0\config\server.properties
  • 找到并编辑log.dirs=D:\kafka_2.13-2.8.0\kafka-logs, (自定义文件夹)
  • 找到并编辑zookeeper.connect=localhost:2181。表示本地运行(默认的可以不改)
  • Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

1.进入Kafka安装目录,新建cmd窗口:

.\bin\windows\kafka-server-start.bat .\config\server.properties

注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行

2.3 创建主题

#window脚本在window文件夹下
.\bin\windows\kafka-topics.bat--create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic01
.\bin\windows\kafka-topics.bat--create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic02

在这里插入图片描述

2.4写入数据

简单的往topic01里面写入10条数据
在这里插入图片描述

二 、代码

scala代码

package cn.ac.iie.hy.datatrans.offset

import cn.ac.iie.hy.datatrans.server.SparkStreamingKafkaOffsetRedisRecoveryNew.scala_convert
import kafka.api.PartitionOffsetRequestInfo
import kafka.common.TopicAndPartition
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.{OffsetRequest, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

import scala.collection.immutable.Map
import scala.collection.mutable
import scala.collection.mutable.HashMap


object SparkStreamingKafkaOffsetRedisRecoveryNew {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sparkstreamingkafkaoffset").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf, Seconds(3))

    streamingContext.sparkContext.setLogLevel("WARN")//设置日志级别

    val topics = Array("topic01", "topic02")
    val groups = Array("group01", "group02")
    val tuples = topics.zip(groups)

    //kafka的参数
    var kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "earliest",
      //修改为手动提交偏移量
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )


    tuples.foreach(topic_group => {
      val jedis = new Jedis("192.168.21.160", 6379)
      val topic = topic_group._1
      val groupId = topic_group._2

      val redisKey = s"${groupId}_${topic}"
      //判断redis中是否保存过历史的offset
      if (jedis.exists(redisKey)) {
        println(s"topic:$topic reids不存在offset!")
        val Java_offsetMap: java.util.Map[String, String] = jedis.hgetAll(redisKey) //partition -> offset
        val offsetMap: Map[String, String] = scala_convert(Java_offsetMap) //java Map 转scala Map
        val partitionToLong: mutable.HashMap[TopicPartition, Long] = getEffectiveOffsets(offsetMap, topic, "localhost")
        println("merge: 合并后的offset")
        println(partitionToLong.toBuffer)

        kafkaParams += "group.id" -> groupId
        val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          //位置策略
          PreferConsistent,
          //订阅的策略
          Subscribe[String, String](Array(topic), kafkaParams, partitionToLong)
        )
        stream.foreachRDD { (rdd, time) =>
          //获取该RDD对应的偏移量,记住只有kafka的rdd才能强转成HasOffsetRanges类型
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          //执行这个rdd的aciton,这里rdd的算子是在集群上执行的
          rdd.foreach { line =>
            //println(s"time:${time}==>${line.key()} ${line.value()}")
          }
          //foreach和foreachPartition的区别
          //foreachPartition不管有没有数据都会执行自己的function
          //foreach只在有数据时执行自己的function
          //      rdd.foreachPartition(it =>{
          //        val list: List[ConsumerRecord[String, String]] = it.toList
          //        println(list)
          //      })

          //将offset保存回redis
          val pipeline = jedis.pipelined()
          offsetRanges.foreach(eachRange => {
            /**
             * redis结构
             *  key: {groupId}_${topic}
             *  value :Map(partition -> offset )
             */
            val topic: String = eachRange.topic
            val fromOffset: Long = eachRange.fromOffset
            val endOffset: Long = eachRange.untilOffset
            val partition: Int = eachRange.partition

            val redisKey = s"${groupId}_${topic}"
            pipeline.hset(redisKey,partition.toString,endOffset.toString)
            println(s"time $time topic:${eachRange.topic} partitioner:${eachRange.partition}_offset :  ${eachRange.untilOffset.toString}")
          })
          pipeline.sync()
        }
      }else{
        println(s"topic:$topic reids不存在offset!")
        kafkaParams += "group.id" -> groupId
        val stream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          //位置策略
          PreferConsistent,
          //订阅的策略
          Subscribe[String, String](Array(topic), kafkaParams)
        )

        stream.foreachRDD { (rdd, time) =>
          //获取该RDD对应的偏移量,记住只有kafka的rdd才能强转成HasOffsetRanges类型
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          //执行这个rdd的aciton,这里rdd的算子是在集群上执行的
          rdd.foreach { line =>
            //println(s"time:${time}==>${line.key()} ${line.value()}")
          }

          //将offset保存回redis
          val pipeline = jedis.pipelined()
          offsetRanges.foreach(eachRange => {
            /**
             * redis结构
             *  key: {groupId}_${topic}
             *  value :Map(partition -> offset )
             */
            val topic: String = eachRange.topic
            val fromOffset: Long = eachRange.fromOffset
            val endOffset: Long = eachRange.untilOffset
            val partition: Int = eachRange.partition

            val redisKey = s"${groupId}_${topic}"
            pipeline.hset(redisKey,partition.toString,endOffset.toString)
           println(s"time $time topic:${eachRange.topic} partitioner:${eachRange.partition} _offset :  ${eachRange.untilOffset.toString}")
          })
          pipeline.sync()

        }

      }
    })


    streamingContext.start()
    streamingContext.awaitTermination()
  }

  def getEffectiveOffsets(offsetMap: Map[String, String], topic: String, host: String): HashMap[TopicPartition, Long] = {
    // 存储Redis的offset
    val redisOffsetMap = new HashMap[TopicPartition, Long]

    offsetMap.foreach(patition_offset => {
      val tp = new TopicPartition(topic, patition_offset._1.toInt)
      redisOffsetMap += tp -> patition_offset._2.toLong
    })

    println(s" kafka该主题:$topic  ----------Redis 维护的offset----------------")
    println(redisOffsetMap.toBuffer)

    //**********用于解决SparkStreaming程序长时间中断,再次消费时已记录的offset丢失导致程序启动报错问题
    import scala.collection.mutable.Map
    //存储kafka集群中每个partition当前最早的offset
    val clusterEarliestOffsets = Map[Long, Long]()
    val consumer: SimpleConsumer = new SimpleConsumer(host, 9092, 100000, 64 * 1024,
      "leaderLookup" + System.currentTimeMillis())
    //使用隐式转换进行java和scala的类型的互相转换
    import scala.collection.convert.wrapAll._
    val request: TopicMetadataRequest = new TopicMetadataRequest(List(topic))
    val response: TopicMetadataResponse = consumer.send(request)
    consumer.close()

    //<topic1_Metadata(p1,p2) topic2_Metadata(p1)> => <topic1_Metadata_p1 ,topic1_Metadata_p2 ,topic1_Metadata_p1>
    val metadatas: mutable.Buffer[PartitionMetadata] = response.topicsMetadata.flatMap(f => f.partitionsMetadata)
    //从kafka集群中得到当前每个partition最早的offset值
    metadatas.map(f => {
      val partitionId: Int = f.partitionId
      val leaderHost: String = f.leader.host
      val leaderPort: Int = f.leader.port
      val clientName: String = "Client_" + topic + "_" + partitionId
      val consumer: SimpleConsumer = new SimpleConsumer(leaderHost, leaderPort, 100000,
        64 * 1024, clientName)

      val topicAndPartition = new TopicAndPartition(topic, partitionId)
      val requestInfo = new HashMap[TopicAndPartition, PartitionOffsetRequestInfo]();
      //kafka.api.OffsetRequest.LatestTime
      requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1));
      val request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
      val response = consumer.getOffsetsBefore(request)
      val offsets: Array[Long] = response.offsets(topic, partitionId)
      consumer.close()
      clusterEarliestOffsets += ((partitionId, offsets(0)))
    })
    println(s"-------topic:$topic 最早offset--------------")
    println(clusterEarliestOffsets)

     //外循环是kafka 最早offsets
        for ((clusterPartition, clusterEarliestOffset) <- clusterEarliestOffsets) {
          val tp = new TopicPartition(topic, clusterPartition.toInt)
          val option: Option[Long] = redisOffsetMap.get(tp)

          // kafka 有的分区,但Redis 没有, 原因:kafka新增了分区
          if (option.isEmpty) { //取最早的offset
            println(s"====>topic:$topic 新增了分区: $tp")
            redisOffsetMap += (tp -> clusterEarliestOffset)
          } else {
            var redisOffset: Long = option.get
            if (redisOffset < clusterEarliestOffset) { //redis中存的offset对比最早的offset已经丢失,取最早的offset
              redisOffset = clusterEarliestOffset
              redisOffsetMap += (tp -> redisOffset)
            }
          }
        }
    redisOffsetMap
  }
}

第一次启动

程序前台打印:(订阅了topic01)
在这里插入图片描述

redis monitor监控

在这里插入图片描述

  • 为了看着方便就没有前台打印数据,从redis维护的offset可以看到已经把10条数据消费完了

三、实际解决的问题

1.sparkstreaming启动时offset超出范围,程序可以进行修正到目前最早offset

这个原因有2个,第一个可能是kafka的默认保存logs文件过期了,第二个可能是存储压力大人为的删除了kafka数据(我简单的用第一种情况模拟下)
1.1修改 server.properties 文件 将过期时间调整至3分钟
log.retention.minutes=3

在这里插入图片描述

  • 重启kafka 等待日志过期
    此时redis保存的offset为16(),再写入4条等待过期后,启动程序
    • 用命令行查看数据已经被清除
      在这里插入图片描述
      程序对过期的offset进行了修正
      在这里插入图片描述
      在这里插入图片描述

2.kafka某个topic新增了分区,程序启动时能感知到并在更新到redis进行消费

将topic02增加到5个分区

在这里插入图片描述
在这里插入图片描述
启动程序
在这里插入图片描述
在这里插入图片描述

  • 可以看到已经将新分区的offset信息保存到redis并进行了订阅
用到的命令
#启动redis
redis-server /usr/local/redis/redis.conf
#监控redis
redis-cli -h localhost -p  6379 monitor

#启动kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties

#创建topic
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic01

#生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic01

#消费者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic01 --from-beginning

#查看所有分区
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list

#查看分区
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic topic02

#增加分区
.\bin\windows\kafka-topics.bat  --bootstrap-server localhost:9092 --alter --topic topic02 --partitions 5
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-13 11:22:43  更:2022-09-13 11:26:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 23:32:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码