IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark程序 读取kafka中数据处理后向mysql写时报错 -> 正文阅读

[大数据]Spark程序 读取kafka中数据处理后向mysql写时报错

Spark程序 读取kafka中数据处理后向mysql写时报错: ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 9) java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord Serialization stack:

ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 9)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
	- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = liuliu, partition = 0, leaderEpoch = 8, offset = 119956, CreateTime = 1625491767986, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1625491767985 南 山东 51 01))
	- element of array (index: 0)
	- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
21/07/06 18:00:36 ERROR TaskSetManager: Task 0.0 in stage 2.0 (TID 9) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
	- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = liuliu, partition = 0, leaderEpoch = 8, offset = 119956, CreateTime = 1625491767986, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1625491767985 南 山东 51 01))
	- element of array (index: 0)
	- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying
21/07/06 18:00:36 ERROR JobScheduler: Error running job streaming job 1625565624000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 9) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
	- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = liuliu, partition = 0, leaderEpoch = 8, offset = 119956, CreateTime = 1625491767986, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1625491767985 南 山东 51 01))
	- element of array (index: 0)
	- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:135)
	at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:49)
	at org.apache.spark.streaming.dstream.DStream.$anonfun$print$3(DStream.scala:736)
	at org.apache.spark.streaming.dstream.DStream.$anonfun$print$3$adapted(DStream.scala:735)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
	at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

根据错误显示

java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:

消费者的消费记录序列化出现了问题,通俗来讲就是offset和partitions之间出现了记录乱序情况 需要重新进行序列化 找到正确的位置。

在设置sparkconf时 指定序列化方式就可以解决了

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

在这里插入图片描述如图所示

序列化在分布式系统中扮演着重要的角色,优化Spark程序时,首当其冲的就是对序列化方式的优化。Spark为使用者提供两种序列化方式:
Java serialization: 默认的序列化方式。
Kryo serialization: 相较于 Java serialization 的方式,速度更快,空间占用更小,但并不支持所有的序列化格式,同时使用的时候需要注册class。spark-sql中默认使用的是kyro的序列化方式。
如果你没有注册需要序列化的class,Kyro依然可以照常工作,但会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。

如果有表述错误地方敬请指出 万分感激

转发请注明出处!

文章参考:
https://blog.csdn.net/lsshlsw/article/details/50856842

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-07 21:17:05  更:2021-07-07 21:17:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/5 3:41:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码