IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 2.Spark SQL核心编程—IDEA 开发 Spark SQL、用户自定义函数—DSF、UDAF—强类型、弱类型 -> 正文阅读

[大数据]2.Spark SQL核心编程—IDEA 开发 Spark SQL、用户自定义函数—DSF、UDAF—强类型、弱类型

2.6 IDEA 开发 Spark SQL

  • 实际开发中,都是使用 IDEA 进行开发的。

2.6.1 添加依赖

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.12</artifactId>
	<version>3.0.0</version>
</dependency>

2.6.2 代码实现

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Spark01_SparkSQL_Basic {

  def main(args: Array[String]): Unit = {
    // TODO 创建 Spark SQL 的运行环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._

    // TODO DataFrame
    val df: DataFrame = spark.read.json("datas/user.json")
    df.show()

    // DataFrame => SQL
    df.createOrReplaceTempView("user")
    spark.sql("select * from user").show()

    // DataFrame => DSL
    //在使用 DataFrame 时,如果涉及转换操作,需要引入转换规则
    df.select("age", "username").show()
    df.select('age-1, 'username).show()

    // TODO DataSet
    // DataFrame 其实是特定泛型的 DataSet
    val seq = Seq(1, 2, 3, 4)
    val ds: Dataset[Int] = seq.toDS()
    ds.show()

    // RDD <=> DataFrame
    val rdd = spark.sparkContext.makeRDD(List((1, "谢清照", 21), (2, "朱玮琦", 20)))
    val df: DataFrame = rdd.toDF("id", "name", "age")
    val rowRDD: RDD[Row] = df.rdd

    // DataFrame <=> DataSet
    val ds: Dataset[User] = df.as[User]
    val df1: DataFrame = ds.toDF()

    // RDD <=> DataSet
    val ds1: Dataset[User] = rdd.map {
      case (id, name, age) => {
        User(id, name, age)
      }
    }.toDS()

    val userRDD: RDD[User] = ds1.rdd

    // TODO 关闭环境
    spark.close()
  }
  
  case class User(id: Int, name: String, age: Int)
  
}

2.7 用户自定义函数

  • 用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

2.7.1 UDF

  • UDF应用演示:
// TODO 创建 SparkSQL 的运行环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
import spark.implicits._

// TODO DataFrame
// 创建 `DataFrame`
val df: DataFrame = spark.read.json("datas/user.json")

// 创建临时表
df.createOrReplaceTempView("user")

// 注册 `UDF`
spark.udf.register( name = "prefixName", (name:String) => {
  "Name: " + name
})

// 应用 `UDF`
spark.sql("select age, prefixName(username) from user").show

2.7.2 UDAF

  • 强类型Dataset弱类型DataFrame 都提供了相关的聚合函数, 如 count()countDistinct()avg()max()min()。除此之外,用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数 Aggregator

需求:计算平均工资

  • (1) 实现方式 - RDD
    略…

  • (2) 实现方式 - 累加器
    略…

  • (3) 实现方式 - UDAF - 弱类型(不推荐使用)

import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

object Spark03_SparkSQL_UDAF {

  def main(args: Array[String]): Unit = {
    // TODO 创建 SparkSQL 的运行环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // TODO DataFrame
    val df: DataFrame = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    spark.udf.register( name = "ageAvg", new MyAvgUDAF)
    spark.sql("select ageAvg(age) from user").show

    // TODO 关闭环境
    spark.close()
  }

  /*
   * 自定义聚合函数类:计算年龄的平均值
   * 1.继承 UserDefinedAggregateFunction
   * 2.重写方法(8个)
   */
  class MyAvgUDAF extends UserDefinedAggregateFunction {
    // 输入数据的结构
    override def inputSchema: StructType = {
      StructType(
        Array(
          StructField("age", LongType)
        )
      )
    }

    // 缓冲区数据的结构: Buffer
    override def bufferSchema: StructType = {
      StructType(
        Array(
          StructField("total", LongType),
          StructField("count", LongType)
        )
      )
    }

    // 函数计算结果的数据类型: Out
    override def dataType: DataType = LongType

    // 函数的稳定性
    override def deterministic: Boolean = true

    // 缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      // buffer(0)= 0L
      // buffer(1) = 0L

      buffer.update(0, 0L)    // "total"
      buffer.update(1, 0L)    // "count"

    }

    // 根据输入的值更新缓冲区数据
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      buffer.update(0, buffer.getLong(0) + input.getLong(0))
      buffer.update(1, buffer.getLong(1) + 1)
    }

    // 缓冲区合并
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0, buffer1.getLong(0) + buffer2.getLong(0))
      buffer1.update(1, buffer1.getLong(1) + buffer2.getLong(1))
    }

    // 计算平均值
    override def evaluate(buffer: Row): Any = {
      buffer.getLong(0)/buffer.getLong(1)
    }
  }
}
  • (4) 实现方式 - UDAF - 强类型
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession, functions}

object Spark03_SparkSQL_UDAF1 {

  def main(args: Array[String]): Unit = {
    // TODO 创建 SparkSQL 的运行环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // TODO DataFrame
    val df: DataFrame = spark.read.json("datas/user.json")
    df.createOrReplaceTempView("user")

    spark.udf.register( name = "ageAvg", functions.udaf(new MyAvgUDAF))
    spark.sql("select ageAvg(age) from user").show

    // TODO 关闭环境
    spark.close()
  }

  /*
   * 自定义聚合函数类:计算年龄的平均值
   * 1.继承 org.apache.spark.sql.expressions.Aggregator, 定义泛型
   *    IN :输入的数据类型 Long
   *    BUF :缓冲区的数据类型 Buff
   *    OUT :输出的数据类型 Long
   * 2.重写方法(8)
   */
  case class Buff(var total: Long, var count: Long)
  class MyAvgUDAF extends Aggregator[Long, Buff, Long] {
    // z & zero :初始值或零值
    // 缓冲区的初始化
    override def zero: Buff = {
      Buff(0L, 0L)
    }

    //根据输入的数据更新缓冲区的数据
    override def reduce(buff: Buff, in: Long): Buff = {
      buff.total = buff.total + in
      buff.count = buff.count + 1
      buff
    }

    // 合并缓冲区
    override def merge(buff1 : Buff, buff2: Buff): Buff = {
      buff1.total = buff1.total + buff2.total
      buff1.count = buff1.count + buff2.count
      buff1
    }

    // 计算结果
    override def finish(buff: Buff): Long = {
      buff.total / buff.count
    }

    // 缓冲区的编码操作
    override def bufferEncoder: Encoder[Buff] = Encoders.product

    // 输出的编码操作
    override def outputEncoder: Encoder[Long] = Encoders.scalaLong
  }
}

声明:本文是学习时记录的笔记,如有侵权请告知删除!
原视频地址:https://www.bilibili.com/video/BV11A411L7CK

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-13 12:07:06  更:2021-08-13 12:08:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:53:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码