背景
sparkSQL中DataFrame在聚合后按规则在每组中选取一条记录,出现异常报错: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
分析
DataFrame是Dataset[Row]的别名,DataFrame的map()函数具有一些陷阱,因为通过map函数每行数据可以转换成任意类型对象(甚至是非Row对象!!),spark自带的反射不能完成这些类型的自动编码,因而报错。
解决方法
假设场景:某中学,同学们在毕业会考期间,如果对自己以往的课程考试成绩不满意,可以再次参加一次考试,刷新上次的分数,等会考结束后,教务处需要统计每个同学所有课程的最新成绩单。
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
val schema = StructType(Nil).add("id", LongType).add("kcCode", StringType).add("score", DoubleType).add("numOfTimes", IntegerType)
val df = spark.createDataFrame(Seq((2021314001L, "KC001", 59d, 1),(2021314001L, "KC001", 89.5, 2),(2021314001L, "KC002", 99d, 1),(2021314002L, "KC001", 100d, 1))).toDF("id", "kcCode", "score", "numOfTimes")
df.groupByKey(row => (row.getAs[Long]("id"),row.getAs[String]("kcCode")))
.reduceGroups((row01,row02)=>{
if (row01.getAs[Int]("numOfTimes") > row02.getAs[Int]("numOfTimes")) row01 else row02
}).map(_._2)(RowEncoder(schema))
.show
执行结果
|