IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SPARK-SQL-之UDF、UDAF -> 正文阅读

[大数据]SPARK-SQL-之UDF、UDAF

SPARK-SQL-之UDF、UDAF

1、UDF使用

// 注册函数    
spark.udf.register("prefix1", (name: String) => {
    "Name:" + name
})
// 使用函数
spark.sql("select *,prefix1(name) from users").show()

2、UDAF使用

2.1 弱类型

// 1 定义UDAF(弱类型、3.0.0之前得版本可以使用,没标记过时)
package com.shufang.rdd_ds_df

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}

class MyUDAF extends UserDefinedAggregateFunction {
  // IN
  override def inputSchema: StructType = {
    StructType(
      Array(
        StructField("age", LongType)
      )
    )
  }

  // MIDDLE 缓冲区类型
  override def bufferSchema: StructType = {
    StructType(
      Array(
        StructField("total", LongType),
        StructField("count", LongType)
      )
    )
  }

  // OUT
  override def dataType: DataType = LongType

  // 函数的稳定性
  override def deterministic: Boolean = {
    true
  }

  // 缓冲器的初始化
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    /*buffer(0) = 0L
    buffer(1) = 0L*/
    buffer.update(0, 0L)
    buffer.update(1, 0L)
  }

  // 根据输入的值更新缓冲区
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer.update(0, buffer.getLong(0) + input.getLong(0))
    buffer.update(1, buffer.getLong(1) + 1)
  }

  // 合并多个缓冲区
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0))
    buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
  }

  // 计算平均值
  override def evaluate(buffer: Row): Any = {
    buffer.getLong(0)/buffer.getLong(1)
  }
}


// 2 注册&使用
spark.udf.register("ageAvg", new MyUDAF)
spark.sql("select ageAvg(id) as av from users").show()

2.2 强类型(spark 3.0.0之后推荐使用)

// 1 声明并实现
package com.shufang.rdd_ds_df

import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}

/**
 * Aggregator[IN, BUF, OUT] should now be registered as a UDF" + via the functions.udaf(agg) method.", "3.0.0"
 */
case class Buff(var total:Long ,var count:Long)
class MyUDAF1 extends Aggregator[Long,Buff,Long] {
  //缓冲区初始化
  override def zero: Buff = Buff(0L,0L)
  //将进来的元素与缓冲区进行合并
  override def reduce(b: Buff, a: Long): Buff = {
    b.count +=1
    b.total += a
    b
  }
  //合并多个缓冲区
  override def merge(b1: Buff, b2: Buff): Buff = {
    b1.count  = b1.count + b2.count
    b1.total  = b1.total + b2.total
    b1
  }

 // 计算最终结果
  override def finish(buff: Buff): Long = {
    buff.total/buff.count
  }

 // 定义序列化编码器
  override def bufferEncoder: Encoder[Buff] = Encoders.product
 //定义序列化编码器
  override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}


// 2 注册并使用,注册方式不一样
spark.udf.register("ageAvg", functions.udaf(new MyUDAF1()))
spark.sql("select ageAvg(id) as av from users").show()
 

2.3 早期版本使用强类型UDAF

如果是3.0.0之前的版本需要使用强类型,需要结合DSL sparkSQL的领域语言

// 1 声明,相当于DS的每一行相当于传入的参数
package com.shufang.rdd_ds_df

    import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}

/**
 * Aggregator[IN, BUF, OUT] should now be registered as a UDF" + via the functions.udaf(agg) method.", "3.0.0"
 */
//case class Buff(var total:Long ,var count:Long)
class MyUDAF2 extends Aggregator[User,Buff,Long] {
    //缓冲区初始化
    override def zero: Buff = Buff(0L,0L)

        override def reduce(b: Buff, a: User): Buff = {
        b.count +=1
            b.total += a.id
            b
    }

    override def merge(b1: Buff, b2: Buff): Buff = {
        b1.count  = b1.count + b2.count
            b1.total  = b1.total + b2.total
            b1
    }

    override def finish(buff: Buff): Long = {
        buff.total/buff.count
    }

    override def bufferEncoder: Encoder[Buff] = Encoders.product

        override def outputEncoder: Encoder[Long] = Encoders.scalaLong
}

// 2 使用
val column: TypedColumn[User, Long] = new MyUDAF2().toColumn
val ds: Dataset[User] = df.as[User]
ds.select(column).show()

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-26 12:10:28  更:2021-08-26 12:10:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 17:06:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码