一、测试工具下载
1.1 下载zookeeper
https://zookeeper.apache.org/releases.html#download 测试我用的是 window版本zookeeper-3.6.3
1.2 生成并修改zoo.cfg
在zookeeper的conf目录下复制一份zoo_sample.cfg文件,并重命名为zoo.cfg
修改zoo.cfg文件里面的路径(dataDir,dataLogDir为新建目录) 配置以下参数
dataDir=D:zookeeper\\apache-zookeeper-3.6.3-bin\\data
dataLogDir=D:zookeeper\\apache-zookeeper-3.6.3-bin\\logs
admin.serverPort=7070
audit.enable=true
clientPort=2181
1.3 启动服务
进入bin目录下,双击zkServer.cmd
1.4 验证是否安装成功
在bin目录下双击zkCli.cmd,打开客户端(此时的服务端zkServer的dos窗口不要关闭),出现"欢迎"字样,说明安装成功!
2.1 安装kafka
kafka官网下载地址: http://kafka.apache.org/downloads.html
我下载的kafka_2.12-3.1.0.tgz,并解压到D:\kafka目录下(解压的目录不能过深,启动脚本不支持很长的路径)
2.2启动kafka
- 编辑文件Kafka配置文件, D:\kafka_2.13-2.8.0\config\server.properties
- 找到并编辑log.dirs=D:\kafka_2.13-2.8.0\kafka-logs, (自定义文件夹)
- 找到并编辑zookeeper.connect=localhost:2181。表示本地运行(默认的可以不改)
- Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。
1.进入Kafka安装目录,新建cmd窗口:
.\bin\windows\kafka-server-start.bat .\config\server.properties
注意:不要关了这个窗口,启用Kafka前请确保ZooKeeper实例已经准备好并开始运行
2.3 创建主题
#window脚本在window文件夹下
.\bin\windows\kafka-topics.bat--create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic01
.\bin\windows\kafka-topics.bat--create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic02
2.4写入数据
简单的往topic01里面写入10条数据
二 、代码
scala代码
package cn.ac.iie.hy.datatrans.offset
import cn.ac.iie.hy.datatrans.server.SparkStreamingKafkaOffsetRedisRecoveryNew.scala_convert
import kafka.api.PartitionOffsetRequestInfo
import kafka.common.TopicAndPartition
import kafka.javaapi.consumer.SimpleConsumer
import kafka.javaapi.{OffsetRequest, PartitionMetadata, TopicMetadataRequest, TopicMetadataResponse}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import scala.collection.immutable.Map
import scala.collection.mutable
import scala.collection.mutable.HashMap
object SparkStreamingKafkaOffsetRedisRecoveryNew {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("sparkstreamingkafkaoffset").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(3))
streamingContext.sparkContext.setLogLevel("WARN")
val topics = Array("topic01", "topic02")
val groups = Array("group01", "group02")
val tuples = topics.zip(groups)
var kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
tuples.foreach(topic_group => {
val jedis = new Jedis("192.168.21.160", 6379)
val topic = topic_group._1
val groupId = topic_group._2
val redisKey = s"${groupId}_${topic}"
if (jedis.exists(redisKey)) {
println(s"topic:$topic reids不存在offset!")
val Java_offsetMap: java.util.Map[String, String] = jedis.hgetAll(redisKey)
val offsetMap: Map[String, String] = scala_convert(Java_offsetMap)
val partitionToLong: mutable.HashMap[TopicPartition, Long] = getEffectiveOffsets(offsetMap, topic, "localhost")
println("merge: 合并后的offset")
println(partitionToLong.toBuffer)
kafkaParams += "group.id" -> groupId
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](Array(topic), kafkaParams, partitionToLong)
)
stream.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach { line =>
}
val pipeline = jedis.pipelined()
offsetRanges.foreach(eachRange => {
val topic: String = eachRange.topic
val fromOffset: Long = eachRange.fromOffset
val endOffset: Long = eachRange.untilOffset
val partition: Int = eachRange.partition
val redisKey = s"${groupId}_${topic}"
pipeline.hset(redisKey,partition.toString,endOffset.toString)
println(s"time $time topic:${eachRange.topic} partitioner:${eachRange.partition}_offset : ${eachRange.untilOffset.toString}")
})
pipeline.sync()
}
}else{
println(s"topic:$topic reids不存在offset!")
kafkaParams += "group.id" -> groupId
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](Array(topic), kafkaParams)
)
stream.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreach { line =>
}
val pipeline = jedis.pipelined()
offsetRanges.foreach(eachRange => {
val topic: String = eachRange.topic
val fromOffset: Long = eachRange.fromOffset
val endOffset: Long = eachRange.untilOffset
val partition: Int = eachRange.partition
val redisKey = s"${groupId}_${topic}"
pipeline.hset(redisKey,partition.toString,endOffset.toString)
println(s"time $time topic:${eachRange.topic} partitioner:${eachRange.partition} _offset : ${eachRange.untilOffset.toString}")
})
pipeline.sync()
}
}
})
streamingContext.start()
streamingContext.awaitTermination()
}
def getEffectiveOffsets(offsetMap: Map[String, String], topic: String, host: String): HashMap[TopicPartition, Long] = {
val redisOffsetMap = new HashMap[TopicPartition, Long]
offsetMap.foreach(patition_offset => {
val tp = new TopicPartition(topic, patition_offset._1.toInt)
redisOffsetMap += tp -> patition_offset._2.toLong
})
println(s" kafka该主题:$topic ----------Redis 维护的offset----------------")
println(redisOffsetMap.toBuffer)
import scala.collection.mutable.Map
val clusterEarliestOffsets = Map[Long, Long]()
val consumer: SimpleConsumer = new SimpleConsumer(host, 9092, 100000, 64 * 1024,
"leaderLookup" + System.currentTimeMillis())
import scala.collection.convert.wrapAll._
val request: TopicMetadataRequest = new TopicMetadataRequest(List(topic))
val response: TopicMetadataResponse = consumer.send(request)
consumer.close()
val metadatas: mutable.Buffer[PartitionMetadata] = response.topicsMetadata.flatMap(f => f.partitionsMetadata)
metadatas.map(f => {
val partitionId: Int = f.partitionId
val leaderHost: String = f.leader.host
val leaderPort: Int = f.leader.port
val clientName: String = "Client_" + topic + "_" + partitionId
val consumer: SimpleConsumer = new SimpleConsumer(leaderHost, leaderPort, 100000,
64 * 1024, clientName)
val topicAndPartition = new TopicAndPartition(topic, partitionId)
val requestInfo = new HashMap[TopicAndPartition, PartitionOffsetRequestInfo]();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime, 1));
val request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
val response = consumer.getOffsetsBefore(request)
val offsets: Array[Long] = response.offsets(topic, partitionId)
consumer.close()
clusterEarliestOffsets += ((partitionId, offsets(0)))
})
println(s"-------topic:$topic 最早offset--------------")
println(clusterEarliestOffsets)
for ((clusterPartition, clusterEarliestOffset) <- clusterEarliestOffsets) {
val tp = new TopicPartition(topic, clusterPartition.toInt)
val option: Option[Long] = redisOffsetMap.get(tp)
if (option.isEmpty) {
println(s"====>topic:$topic 新增了分区: $tp")
redisOffsetMap += (tp -> clusterEarliestOffset)
} else {
var redisOffset: Long = option.get
if (redisOffset < clusterEarliestOffset) {
redisOffset = clusterEarliestOffset
redisOffsetMap += (tp -> redisOffset)
}
}
}
redisOffsetMap
}
}
第一次启动
程序前台打印:(订阅了topic01)
redis monitor监控
- 为了看着方便就没有前台打印数据,从redis维护的offset可以看到已经把10条数据消费完了
三、实际解决的问题
1.sparkstreaming启动时offset超出范围,程序可以进行修正到目前最早offset
这个原因有2个,第一个可能是kafka的默认保存logs文件过期了,第二个可能是存储压力大人为的删除了kafka数据(我简单的用第一种情况模拟下)
1.1修改 server.properties 文件 将过期时间调整至3分钟
log.retention.minutes=3
- 重启kafka 等待日志过期
此时redis保存的offset为16(),再写入4条等待过期后,启动程序
- 用命令行查看数据已经被清除
程序对过期的offset进行了修正
2.kafka某个topic新增了分区,程序启动时能感知到并在更新到redis进行消费
将topic02增加到5个分区
启动程序
- 可以看到已经将新分区的offset信息保存到redis并进行了订阅
用到的命令
#启动redis
redis-server /usr/local/redis/redis.conf
#监控redis
redis-cli -h localhost -p 6379 monitor
#启动kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
#创建topic
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic01
#生产者
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic topic01
#消费者
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic01 --from-beginning
#查看所有分区
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --list
#查看分区
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic topic02
#增加分区
.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic topic02 --partitions 5
|