IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Apache Spark:Task not serializable异常的排查和解决 -> 正文阅读

[大数据]Apache Spark:Task not serializable异常的排查和解决

1. 声明

当前内容主要为排查在排序的时候Spark突然出现的java.io.NotSerializableException问题,以及解决思路

2. 还原报错代码

实体类:User

import java.io.Serializable;

public class User implements Serializable {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private Integer id;
	private String name;
	private Double score;
	// 省略getsettoStirng,构造函数
}

实体类已实现Serializable接口!

实际代码:

public class SortDataOperationTest {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setMaster("local").setAppName("sort data test");
		JavaSparkContext jsc = new JavaSparkContext(conf);
		sortOperation(jsc);
		jsc.close();
	}

	private static void sortOperation(JavaSparkContext jsc) {
		// List<Integer> intList = Arrays.asList(1, 5, 8, 4, 3, 1, 2, 9, 7, -1);
		// JavaRDD<Integer> parallelize = jsc.parallelize(intList);
		List<User> userList = Arrays.asList(new User(1, "zs", 50.2), new User(2, "ls", 70.5), new User(3, "ww", 65.2));
		// 按照key的方式进行排序的操作
		jsc.parallelize(userList).mapToPair(new PairFunction<User, Integer, User>() {

			@Override
			public Tuple2<Integer, User> call(User t) throws Exception {
				return new Tuple2<Integer, User>(t.getId(), t);
			}
		}).sortByKey().foreach(x -> System.out.println(x));

		//	按照当前的id的方式进行排序
		Comparator<User> comparator = new Comparator<User>() {

			@Override
			public int compare(User o1, User o2) {
				return o1.getId() - o2.getId();
			}
		};

		jsc.parallelize(userList).mapToPair(new PairFunction<User, User, Integer>() {
			@Override
			public Tuple2<User, Integer> call(User t) throws Exception {
				// TODO Auto-generated method stub
				return new Tuple2<User, Integer>(t, 1);
			}
		}).sortByKey(comparator).foreach(x -> System.out.println(x));

	}

}

3. 执行报错

Serialization stack:
	- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
	- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
	- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
	- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
	- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
21/08/29 13:46:05 INFO DAGScheduler: Job 1 failed: foreach at SortDataOperationTest.java:78, took 0.053300 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.hy.spark.test.basic.SortDataOperationTest$2
Serialization stack:
	- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
	- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
	- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
	- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
	- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1167)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1071)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1074)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1073)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1073)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1014)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
	at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:351)
	at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:45)
	at com.hy.spark.test.basic.SortDataOperationTest.sortOperation(SortDataOperationTest.java:78)
	at com.hy.spark.test.basic.SortDataOperationTest.main(SortDataOperationTest.java:30)
Caused by: java.io.NotSerializableException: com.hy.spark.test.basic.SortDataOperationTest$2
Serialization stack:
	- object not serializable (class: com.hy.spark.test.basic.SortDataOperationTest$2, value: com.hy.spark.test.basic.SortDataOperationTest$2@2c5f8bc8)
	- field (class: scala.math.LowPriorityOrderingImplicits$$anon$7, name: cmp$2, type: interface java.util.Comparator)
	- object (class scala.math.LowPriorityOrderingImplicits$$anon$7, scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f)
	- field (class: scala.Some, name: x, type: class java.lang.Object)
	- object (class scala.Some, Some(scala.math.LowPriorityOrderingImplicits$$anon$7@ee0c35f))
	- field (class: org.apache.spark.ShuffleDependency, name: keyOrdering, type: class scala.Option)
	- object (class org.apache.spark.ShuffleDependency, org.apache.spark.ShuffleDependency@51a9145b)
	- field (class: scala.Tuple2, name: _2, type: class java.lang.Object)
	- object (class scala.Tuple2, (MapPartitionsRDD[4] at mapToPair at SortDataOperationTest.java:72,org.apache.spark.ShuffleDependency@51a9145b))
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1155)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1071)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1074)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:1073)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1073)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1014)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2069)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

错误报了一大堆,但是前面的没有报错啊!
在这里插入图片描述

4.排查错误

说明当前的sortByKey()是没有任何问题,所以到处找那个类没有实现序列化接口…

开始检查User是否实现序列化接口,看到这个 com.hy.spark.test.basic.SortDataOperationTest$2,这个好像是匿名的导致的?

最后找到字段属性
在这里插入图片描述
感觉Spark应该没这个笨让Object对象出现问题,然后开始检查里面的类型,不停的开始将所有未实现序列化的都实现,结果还是报错!

最后定位在了这个地方:(Object对象和其他的对象能有的都有了,不能动的也无法实现序列化,最后发现可能是Comparator这个是匿名的未实现序列化接口导致的)
在这里插入图片描述

5.验证结果

将匿名的函数提取出来变成实现类并实现序列化接口

static class UserComparator implements Comparator<User>, Serializable {

	@Override
	public int compare(User o1, User o2) {
		return (int) (o1.getScore() - o2.getScore());
	}
}

最后将代码中的匿名的Comparator替换成UserComparator,结果就成功了,为了方便查看结果设置日志级别为WARN(jsc.setLogLevel("WARN");)

在这里插入图片描述

使用成功,说明在使用Spark的时候比较器一定要手动使用实现类,并实现序列化接口,否则在排序操作的时候就会出现序列化问题的!!!(小心匿名类的使用)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:08:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:56:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码