IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SparkSQL 自定义函数 -> 正文阅读

[大数据]SparkSQL 自定义函数

1. UDF

在 Hive 中,我们实现的 UDF 必须将方法命名为 evaluate ,而 Spark SQL 中却没有这么无理的要求,我们可以根据所需随意自定义函数。

语法格式:

spark.udf.register(函数名,函数体)

🌰 将日期变化格式:

原数据 birthday.txt 预览:

Michael, 2020/Nov/12 15:34:56
Andy, 2020/Dec/05 17:27:38
Justin, 2020/Dec/27 22:48:23

程序实现:

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
    .appName("UDF")
    .master("local[2]")
    .getOrCreate()

    val sc = SparkContext.getOrCreate()

    import spark.implicits._

    // 加载数据源,将其转化为 DataFrame
    var df = sc.textFile("birthday.txt")
    .map(_.split(","))
    .map(line => (line(0), line(1)))
    .toDF("name", "birthday") // 转型时指定字段的名称


    // 自定义函数的实现
    spark.udf.register("TranBirth", (dt: String) => {
        // 日期的输入格式(US)
        val parser = new SimpleDateFormat("yyyy/MMM/dd HH:mm:ss", Locale.US)
        // 日期的输出格式
        val formatter = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss")
        // 将输入日期转型
        formatter.format(parser.parse(dt))
    })

    // 建立临时视图
    df.createOrReplaceTempView("birthday")

    // SQL 语句中使用自定义函数
    spark.sql("select name, TranBirth(birthday.birthday) from birthday").show()

}

输出:

+-------+-------------------+
|   name|TranBirth(birthday)|
+-------+-------------------+
|Michael|12-11-2020 15:34:56|
|   Andy|05-12-2020 17:27:38|
| Justin|27-12-2020 22:48:23|
+-------+-------------------+

2. UDAF

强类型的 DataSet 和弱类型的 DataFrame 都提供了相关的聚合函数,如 count()countDistinct()avg()min() 等。除此之外,用于可以设定自己的聚合函数,通过继承 UserDefinedAggregateFunction 实现用户自定义弱类型函数,自 Spark 3.0 之后,UserDefinedAggregateFunction 已不推荐使用了,可以统一采用强类型聚合函数 Aggergator

2.1 RDD 实现

🌰实例:计算平均工资

val rdd = sc.makeRDD(List(("Michael", 3000),("Andy", 3300), ("Justin", 4500)))
  .map{
    case(name, age) => (age, 1)
  }
  .reduce((t1, t2) => (t1._1 + t2._1 , t1._2 +  t2._2))
println(rdd._1 / rdd._2 * 1.0)  // 输出: 3600.0

2.2 UDAF 弱类型实现

🌰实例:计算平均工资

数据预览 employees.json

{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}

自定义类,继承 UserDefinedAggregateFunction 并实现其中的方法。

class AverageUDAF extends UserDefinedAggregateFunction {

    // 聚合函数输入参数的数据类型
    override def inputSchema: StructType = 
    StructType(Array(StructField("salary", IntegerType)))

    // 聚合函数缓冲区中值的数据类型(age,count)
    override def bufferSchema: StructType =
    StructType(Array(StructField("sum", LongType), StructField("count", LongType)))

    // 函数返回值的数据类型
    override def dataType: DataType = DoubleType

    // 稳定性:对于相同的输入是否一直返回相同的输出。
    override def deterministic: Boolean = true

    // 函数缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        buffer(0) = 0L
        buffer(1) = 0L
    }

    // 更新缓冲区中的数据
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        if (!input.isNullAt(0)) {
            buffer(0) = buffer.getLong(0) + input.getInt(0)
            buffer(1) = buffer.getLong(1) + 1
        }
    }

    // 合并缓冲区
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
        buffer1(0) = buffer1.getLong(1) + buffer2.getLong(1)
    }

    // 计算最终结果
    override def evaluate(buffer: Row): Double = 
    buffer.getLong(0) / buffer.getLong(1) * 1.0
}
spark.udf.register("AverageUDAF", new AverageUDAF)
val df = spark.read.json("employees.json")

df.createOrReplaceTempView("employees")

spark.sql("select name, AverageUDAF(salary) from employees").show()

2.3 UDAF 强类型

🌰实例:计算平均工资

数据预览 employees.json

{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}

自定义类,继承 Aggregator 并实现其中的方法。

import org.apache.spark.sql.expressions.Aggregator

// 输入数据类型
case class Emp(name: String, salary: Long)

// 缓冲数据类型
case class AvgBuffer(var sum: Long, var count: Long)

class AgeUDAF extends Aggregator[Emp, AvgBuffer, Double] {

    override def zero: AvgBuffer = AvgBuffer(0L, 0L)

    override def reduce(b: AvgBuffer, a: Emp): AvgBuffer = {
        b.sum = b.sum + a.salary
        b.count += 1
        b
    }

    override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {
        b1.sum += b2.sum
        b1.count += b2.count
        b1
    }

    override def finish(reduction: AvgBuffer): Double =
    reduction.sum.toDouble / reduction.count

    override def bufferEncoder: Encoder[AvgBuffer] = Encoders.product

    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
spark.udf.register("AgeUDAF", functions.udaf(new AgeUDAF))
val df = spark.read.json("employees.json")

df.createOrReplaceTempView("employees")

spark.sql("select AgeUDAF(salary) from employees").show()

?


???END???
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-23 15:49:12  更:2021-12-23 15:49:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 7:52:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码