Spark程序 读取kafka中数据处理后向mysql写时报错: ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 9) java.io.NotSerializableException:org.apache.kafka.clients.consumer.ConsumerRecord Serialization stack:
ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 9)
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = liuliu, partition = 0, leaderEpoch = 8, offset = 119956, CreateTime = 1625491767986, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1625491767985 南 山东 51 01))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:489)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/07/06 18:00:36 ERROR TaskSetManager: Task 0.0 in stage 2.0 (TID 9) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = liuliu, partition = 0, leaderEpoch = 8, offset = 119956, CreateTime = 1625491767986, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1625491767985 南 山东 51 01))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying
21/07/06 18:00:36 ERROR JobScheduler: Error running job streaming job 1625565624000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 9) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = liuliu, partition = 0, leaderEpoch = 8, offset = 119956, CreateTime = 1625491767986, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 1625491767985 南 山东 51 01))
- element of array (index: 0)
- array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:135)
at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:49)
at org.apache.spark.streaming.dstream.DStream.$anonfun$print$3(DStream.scala:736)
at org.apache.spark.streaming.dstream.DStream.$anonfun$print$3$adapted(DStream.scala:735)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
根据错误显示
java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
消费者的消费记录序列化出现了问题,通俗来讲就是offset和partitions之间出现了记录乱序情况 需要重新进行序列化 找到正确的位置。
在设置sparkconf时 指定序列化方式就可以解决了
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
如图所示
序列化在分布式系统中扮演着重要的角色,优化Spark程序时,首当其冲的就是对序列化方式的优化。Spark为使用者提供两种序列化方式: Java serialization: 默认的序列化方式。 Kryo serialization: 相较于 Java serialization 的方式,速度更快,空间占用更小,但并不支持所有的序列化格式,同时使用的时候需要注册class。spark-sql中默认使用的是kyro的序列化方式。 如果你没有注册需要序列化的class,Kyro依然可以照常工作,但会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。
如果有表述错误地方敬请指出 万分感激
转发请注明出处!
文章参考: https://blog.csdn.net/lsshlsw/article/details/50856842
|