1. UDF
在 Hive 中,我们实现的 UDF 必须将方法命名为 evaluate ,而 Spark SQL 中却没有这么无理的要求,我们可以根据所需随意自定义函数。
语法格式:
spark.udf.register(函数名,函数体)
🌰 将日期变化格式:
原数据 birthday.txt 预览:
Michael, 2020/Nov/12 15:34:56
Andy, 2020/Dec/05 17:27:38
Justin, 2020/Dec/27 22:48:23
程序实现:
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("UDF")
.master("local[2]")
.getOrCreate()
val sc = SparkContext.getOrCreate()
import spark.implicits._
var df = sc.textFile("birthday.txt")
.map(_.split(","))
.map(line => (line(0), line(1)))
.toDF("name", "birthday")
spark.udf.register("TranBirth", (dt: String) => {
val parser = new SimpleDateFormat("yyyy/MMM/dd HH:mm:ss", Locale.US)
val formatter = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")
formatter.format(parser.parse(dt))
})
df.createOrReplaceTempView("birthday")
spark.sql("select name, TranBirth(birthday.birthday) from birthday").show()
}
输出:
+-------+-------------------+
| name|TranBirth(birthday)|
+-------+-------------------+
|Michael|12-11-2020 15:34:56|
| Andy|05-12-2020 17:27:38|
| Justin|27-12-2020 22:48:23|
+-------+-------------------+
2. UDAF
强类型的 DataSet 和弱类型的 DataFrame 都提供了相关的聚合函数,如 count() 、countDistinct() 、avg() 、min() 等。除此之外,用于可以设定自己的聚合函数,通过继承 UserDefinedAggregateFunction 实现用户自定义弱类型函数,自 Spark 3.0 之后,UserDefinedAggregateFunction 已不推荐使用了,可以统一采用强类型聚合函数 Aggergator 。
2.1 RDD 实现
🌰实例:计算平均工资
val rdd = sc.makeRDD(List(("Michael", 3000),("Andy", 3300), ("Justin", 4500)))
.map{
case(name, age) => (age, 1)
}
.reduce((t1, t2) => (t1._1 + t2._1 , t1._2 + t2._2))
println(rdd._1 / rdd._2 * 1.0)
2.2 UDAF 弱类型实现
🌰实例:计算平均工资
数据预览 employees.json :
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
自定义类,继承 UserDefinedAggregateFunction 并实现其中的方法。
class AverageUDAF extends UserDefinedAggregateFunction {
override def inputSchema: StructType =
StructType(Array(StructField("salary", IntegerType)))
override def bufferSchema: StructType =
StructType(Array(StructField("sum", LongType), StructField("count", LongType)))
override def dataType: DataType = DoubleType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getInt(0)
buffer(1) = buffer.getLong(1) + 1
}
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(0) = buffer1.getLong(1) + buffer2.getLong(1)
}
override def evaluate(buffer: Row): Double =
buffer.getLong(0) / buffer.getLong(1) * 1.0
}
spark.udf.register("AverageUDAF", new AverageUDAF)
val df = spark.read.json("employees.json")
df.createOrReplaceTempView("employees")
spark.sql("select name, AverageUDAF(salary) from employees").show()
2.3 UDAF 强类型
🌰实例:计算平均工资
数据预览 employees.json :
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
自定义类,继承 Aggregator 并实现其中的方法。
import org.apache.spark.sql.expressions.Aggregator
case class Emp(name: String, salary: Long)
case class AvgBuffer(var sum: Long, var count: Long)
class AgeUDAF extends Aggregator[Emp, AvgBuffer, Double] {
override def zero: AvgBuffer = AvgBuffer(0L, 0L)
override def reduce(b: AvgBuffer, a: Emp): AvgBuffer = {
b.sum = b.sum + a.salary
b.count += 1
b
}
override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
override def finish(reduction: AvgBuffer): Double =
reduction.sum.toDouble / reduction.count
override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product
override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
spark.udf.register("AgeUDAF", functions.udaf(new AgeUDAF))
val df = spark.read.json("employees.json")
df.createOrReplaceTempView("employees")
spark.sql("select AgeUDAF(salary) from employees").show()
?
???END???
|