2.6 IDEA 开发 Spark SQL
2.6.1 添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2.6.2 代码实现
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Spark01_SparkSQL_Basic {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("datas/user.json")
df.show()
df.createOrReplaceTempView("user")
spark.sql("select * from user").show()
df.select("age", "username").show()
df.select('age-1, 'username).show()
val seq = Seq(1, 2, 3, 4)
val ds: Dataset[Int] = seq.toDS()
ds.show()
val rdd = spark.sparkContext.makeRDD(List((1, "谢清照", 21), (2, "朱玮琦", 20)))
val df: DataFrame = rdd.toDF("id", "name", "age")
val rowRDD: RDD[Row] = df.rdd
val ds: Dataset[User] = df.as[User]
val df1: DataFrame = ds.toDF()
val ds1: Dataset[User] = rdd.map {
case (id, name, age) => {
User(id, name, age)
}
}.toDS()
val userRDD: RDD[User] = ds1.rdd
spark.close()
}
case class User(id: Int, name: String, age: Int)
}
2.7 用户自定义函数
- 用户可以通过
spark.udf 功能添加自定义函数,实现自定义功能。
2.7.1 UDF
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register( name = "prefixName", (name:String) => {
"Name: " + name
})
spark.sql("select age, prefixName(username) from user").show
2.7.2 UDAF
- 强类型的
Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count() ,countDistinct() ,avg() ,max() ,min() 。除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数 Aggregator 。
需求:计算平均工资
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
object Spark03_SparkSQL_UDAF {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register( name = "ageAvg", new MyAvgUDAF)
spark.sql("select ageAvg(age) from user").show
spark.close()
}
class MyAvgUDAF extends UserDefinedAggregateFunction {
override def inputSchema: StructType = {
StructType(
Array(
StructField("age", LongType)
)
)
}
override def bufferSchema: StructType = {
StructType(
Array(
StructField("total", LongType),
StructField("count", LongType)
)
)
}
override def dataType: DataType = LongType
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0L)
buffer.update(1, 0L)
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0, buffer.getLong(0) + input.getLong(0))
buffer.update(1, buffer.getLong(1) + 1)
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
}
override def evaluate(buffer: Row): Any = {
buffer.getLong(0)/buffer.getLong(1)
}
}
}
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}
object Spark03_SparkSQL_UDAF1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val df: DataFrame = spark.read.json("datas/user.json")
df.createOrReplaceTempView("user")
spark.udf.register( name = "ageAvg", functions.udaf(new MyAvgUDAF))
spark.sql("select ageAvg(age) from user").show
spark.close()
}
case class Buff(var total: Long, var count: Long)
class MyAvgUDAF extends Aggregator[Long, Buff, Long] {
override def zero: Buff = {
Buff(0L, 0L)
}
override def reduce(buff: Buff, in: Long): Buff = {
buff.total = buff.total + in
buff.count = buff.count + 1
buff
}
override def merge(buff1 : Buff, buff2: Buff): Buff = {
buff1.total = buff1.total + buff2.total
buff1.count = buff1.count + buff2.count
buff1
}
override def finish(buff: Buff): Long = {
buff.total / buff.count
}
override def bufferEncoder: Encoder[Buff] = Encoders.product
override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}
}
声明:本文是学习时记录的笔记,如有侵权请告知删除! 原视频地址:https://www.bilibili.com/video/BV11A411L7CK
|