1. 声明
当前内容主要为排查在排序的时候Spark突然出现的java.io.NotSerializableException 问题,以及解决思路
2. 还原报错代码
实体类:User
import java.io.Serializable;
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private Integer id;
private String name;
private Double score;
}
实体类已实现Serializable接口!
实际代码:
public class SortDataOperationTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local").setAppName("sort data test");
JavaSparkContext jsc = new JavaSparkContext(conf);
sortOperation(jsc);
jsc.close();
}
private static void sortOperation(JavaSparkContext jsc) {
List<User> userList = Arrays.asList(new User(1, "zs", 50.2), new User(2, "ls", 70.5), new User(3, "ww", 65.2));
jsc.parallelize(userList).mapToPair(new PairFunction<User, Integer, User>() {
@Override
public Tuple2<Integer, User> call(User t) throws Exception {
return new Tuple2<Integer, User>(t.getId(), t);
}
}).sortByKey().foreach(x -> System.out.println(x));
Comparator<User> comparator = new Comparator<User>() {
@Override
public int compare(User o1, User o2) {
return o1.getId() - o2.getId();
}
};
jsc.parallelize(userList).mapToPair(new PairFunction<User, User, Integer>() {
@Override
public Tuple2<User, Integer> call(User t) throws Exception {
return new Tuple2<User, Integer>(t, 1);
}
}).sortByKey(comparator).foreach(x -> System.out.println(x));
}
}
3. 执行报错
Serialization stack:
- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
21/08/29 13:46:05 INFO DAGScheduler: Job 1 failed: foreach at SortDataOperationTest.java:78, took 0.053300 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.hy.spark.test.basic.SortDataOperationTest$2
Serialization stack:
- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1167)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1071)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1074)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1073)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1073)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1014)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
at com.hy.spark.test.basic.SortDataOperationTest.sortOperation(SortDataOperationTest.java:78)
at com.hy.spark.test.basic.SortDataOperationTest.main(SortDataOperationTest.java:30)
Caused by: java.io.NotSerializableException: com.hy.spark.test.basic.SortDataOperationTest$2
Serialization stack:
- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1155)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1071)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1074)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1073)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1073)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1014)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
错误报了一大堆,但是前面的没有报错啊!
4.排查错误
说明当前的sortByKey()是没有任何问题,所以到处找那个类没有实现序列化接口…
开始检查User是否实现序列化接口,看到这个 com.hy.spark.test.basic.SortDataOperationTest$2 ,这个好像是匿名的导致的?
最后找到字段属性 感觉Spark应该没这个笨让Object对象出现问题,然后开始检查里面的类型,不停的开始将所有未实现序列化的都实现,结果还是报错!
最后定位在了这个地方:(Object对象和其他的对象能有的都有了,不能动的也无法实现序列化,最后发现可能是Comparator这个是匿名的未实现序列化接口导致的)
5.验证结果
将匿名的函数提取出来变成实现类并实现序列化接口
static class UserComparator implements Comparator<User>, Serializable {
@Override
public int compare(User o1, User o2) {
return (int) (o1.getScore() - o2.getScore());
}
}
最后将代码中的匿名的Comparator替换成UserComparator,结果就成功了,为了方便查看结果设置日志级别为WARN(jsc.setLogLevel("WARN"); )
使用成功,说明在使用Spark的时候比较器一定要手动使用实现类,并实现序列化接口,否则在排序操作的时候就会出现序列化问题的 !!!(小心匿名类的使用)
|