kafka数据到hudi丢失数据问题
1.报错问题
Caused by: java.lang.IllegalStateException: Cannot fetch offset 196 (GroupId: spark-kafka-source-6f1df211-fdcb-4bcc-813d-55c4f9661c9d-1732697149-executor, TopicPartition: news-0).
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss0(KafkaDataConsumer.scala:642)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$reportDataLoss(KafkaDataConsumer.scala:448)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:269)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:234)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:209)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:234)
2.根据提示添加配置文件 -> option(“failOnDataLoss”,“false”)
val df: DataFrame = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", params.brokerList)
.option("subscribe", params.topic)
.option("startingOffsets", "latest")
.option("kafka.consumer.commit.groupid", "action-log-group01")
.option("failOnDataLoss","false")
.load()
tips:认为添加这个配置不太妥当,但尚未找到适宜的方法 哪位博主知道的,希望可以指点指点
|