IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【Flink Scala】Table API 自定义函数 -> 正文阅读

[大数据]【Flink Scala】Table API 自定义函数

作者:recommend-item-box type_blog clearfix


Flink Table SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以实 现用户自定义的函数(UDF)来解决。

内置函数

Flink Table APISQL 为用户提供了一组用于数据转换的内置函数。SQL 中支持的很多 函数,Table API SQL都已经做了实现,其它还在快速开发扩展中。

以下是一些典型函数的举例

比较函数

SQLTable API
value1 = value2ANY1 === ANY2
value1 > value2ANY1 > ANY2

逻辑函数

SQLTable API
boolean1 OR boolean2BOOLEAN1 || BOOLEAN
boolean IS FALSEBOOLEAN.isFalse
NOT boolean!BOOLEAN

算术函数

SQLTable API
numeric1 + numeric2NUMERIC1 + NUMERIC
POWER(numeric1, numeric2NUMERIC1.power(NUMERIC2)

字符串函数

SQLTable API
string1 || string2STRING1 + STRING
UPPER(string)STRING.upperCase()
CHAR_LENGTH(string)STRING.charLength()

时间函数

SQLTable API
DATE stringSTRING.toDate
TIMESTAMP stringSTRING.toTimestamp
CURRENT_TIMEcurrentTime()
INTERVAL string rangeNUMERIC.days
NUMERIC.minutes

聚合函数

SQLTable API
COUNT(*)FIELD.count
SUM([ ALL | DISTINCT ] expressionFIELD.sum0
RANK()
ROW_NUMBER()

UDF

用户定义函数(User-defined Functions,UDF)是一个重要的特性,因为它们显著地扩 展了查询(Query)的表达能力。一些系统内置函数无法解决的需求,我们可以用 UDF 来自 定义实现

注册用户自定义函数 UDF

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为 ScalaTable API注册函数。

函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当用户定义的函数 被注册时,它被插入到 TableEnvironment 的函数目录中,这样 Table API 或 SQL 解析器就可 以识别并正确地解释它。

标量函数

用户定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值。

为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function, 并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定, 求值方法必须公开声明并命名为 eval(直接 def 声明,没有 override)。求值方法的参数类型 和返回类型,确定了标量函数的参数和返回类型

我们自定义一个函数,输出hashcode相关的值

不要导错包

image-20220331113329082

class HashCode(factor: Int) extends ScalarFunction {
  def eval(s: String): Int = {
    s.hashCode * factor - 10000
  }
}

里面的具体实现方法必须是公开的,方法名必须是eval

类的参数是定义类是输入的

方法得参数,就是具体需要处理的数据

使用步骤

package UDF

import Source.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
 * 标量自定义函数
 */
object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()

    val tableEnv = StreamTableEnvironment.create(env, settings)


    val inputPath = "src/main/resources/SensorReading"
    val inputStream = env.readTextFile(inputPath)

    //转换成样例类类型
    val dataStream = inputStream.map(
      data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      }
      //选自字段作为时间戳
    ).assignTimestampsAndWatermarks(
      new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading) = t.timeStamp
      })
    val sensorTable = tableEnv.fromDataStream(dataStream
      , 'id, 'temperature, 'timeStamp.rowtime as 'ts)

    //Table API
    val hashCode = new HashCode(23)
    val resultTable = sensorTable.select('id, 'ts, hashCode('id))

    //sql
    //需要先注册
    tableEnv.createTemporaryView("sensor", sensorTable)
    tableEnv.registerFunction("hashCode", hashCode)
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select id,ts,hashCode(id) from sensor
        |""".stripMargin
    )

    resultTable.toAppendStream[Row].print("table")
    resultSqlTable.toAppendStream[Row].print("sql")
    env.execute()

  }
}

结果展示

image-20220331114311536

跳转顶部


表函数

与用户定义的标量函数类似,用户定义的表函数,可以将 0、1 或多个标量值作为输入 参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。

为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFunction 并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。求值方法的参数类型,决定表函数的所有有效参数

返回表的类型由 TableFunction 的泛型类型确定。求值方法使用 protected collect(T)方 法发出输出行

Table API 中,Table 函数需要与.joinLateral.leftOuterJoinLateral 一起使用。

joinLateral 算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它 的表达式)计算得到的所有行连接起来。

leftOuterJoinLateral算子,则是左外连接,它同样会将外部表中的每一行与表函数计 算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。

SQL 中,则需要使用 Lateral Table(),或者带有 ON TRUE 条件的左连 接


拆词并且统计字符个数

/**
 * 表函数,拆词并且统计字符个数
 */
class Split(separator: String) extends TableFunction[(String, Int)] {
  def eval(s: String) = {
    s.split(separator).foreach(
      word => collect((word, word.length))
    )
  }
}

函数的使用

    //Table API
    val split = new Split("_")
    val resultTable = sensorTable
      .joinLateral(split('id) as('word, 'length)) //侧向连接
      .select('id, 'ts, 'word, 'length)

    //sql
    tableEnv.createTemporaryView("sensor", sensorTable)
    tableEnv.registerFunction("split", split)
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select
        | id,ts,word,length
        |from
        | sensor,lateral table(split(id)) as splitid(word,length)
        |""".stripMargin
    )

image-20220331115512997

跳转顶部


聚合函数

用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的 数据,聚合成一个标量值。用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实 现的

image-20220331112325999

上图中显示了一个聚合的例子。

假设现在有一张表,包含了各种饮料的数据。该表由三列(idname price)、五行 组成数据。现在我们需要找到表中所有饮料的最高价格,即执行 max()聚合,结果将是一 个数值。

AggregateFunction 的工作原理如下。

  • 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过 调用 AggregateFunctioncreateAccumulator()方法创建空累加器。

  • 随后,对每个输入行调用函数的 accumulate()方法来更新累加器。

  • 处理完所有行后,将调用函数的 getValue()方法来计算并返回最终结果。 AggregationFunction 要求必须实现的方法:

  • createAccumulator()

  • accumulate()

  • getValue()

除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询 更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口 (session group window)的上下文中,则 merge()方法是必需的。

  • retract()

  • merge()

  • resetAccumulat


求每个传感器的平均温度

不要导错包

image-20220331120027076

创建

//定义一个类,用于表示聚合的状态
class AvgTempAcc {
  var sum: Double = 0.0
  var count: Int = 0
}

/**
 * 自定义一个聚合函数,求每个传感器的平均温度
 * 两个参数,一个是输入的数据,一个是保存在agg里面的状态
 */
class AvgTemp extends AggregateFunction[Double, AvgTempAcc] {
  //状态怎么计算
  override def getValue(acc: AvgTempAcc): Double = acc.sum / acc.count

  //创建初始的状态值
  override def createAccumulator(): AvgTempAcc = new AvgTempAcc

  //还要实现具体的处理计算函数
  def accumulate(acc: AvgTempAcc, temp: Double) = {
    acc.sum += temp
    acc.count += 1
  }
}

使用

    //Table API
    val avgTemp = new AvgTemp()
    val resultTable = sensorTable
      .groupBy('id)
      .aggregate(avgTemp('temperature) as 'avg_temp)
      .select('id, 'avg_temp)

    //sql
    //需要先注册
    tableEnv.createTemporaryView("sensor", sensorTable)
    tableEnv.registerFunction("avgTemp", avgTemp)
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select id,avgTemp(temperature) from sensor
        |group by id
        |""".stripMargin
    )

image-20220331121536036

跳转顶部


表聚合函数

用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一 个表中数据,聚合为具有多行和多列的结果表。这跟 AggregateFunction 非常类似,只是之 前聚合结果是一个标量值,现在变成了一张表

image-20220331112706172

比如现在我们需要找到表中所有饮料的前 2 个最高价格,即执行 top2()表聚合。我 们需要检查 5 行中的每一行,得到的结果将是一个具有排序后前 2 个值的表

用户定义的表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的

TableAggregateFunction 的工作原理如下。

  • 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。 通过调用 TableAggregateFunction createAccumulator()方法可以创建空累加器。

  • 随后,对每个输入行调用函数的 accumulate()方法来更新累加器。

  • 处理完所有行后,将调用函数的 emitValue()方法来计算并返回最终结果。

AggregationFunction 要求必须实现的方法:

  • createAccumulator()

  • accumulate()

除了上述方法之外,还有一些可选择实现的方法。

  • retract()
  • merge()
  • resetAccumulator()
  • emitValue()
  • emitUpdateWithRetract()

定义创建

//定义一个类来表示聚合状态
class Top2TempAcc {
  var highestTemp: Double = Double.MinValue
  var secondTemp: Double = Double.MinValue
}

class Top2Temp extends TableAggregateFunction[(Double, Int), Top2TempAcc] {
  override def createAccumulator(): Top2TempAcc = new Top2TempAcc()

  //实现计算聚合结果的函数
  def accumulate(acc: Top2TempAcc, temp: Double) = {
    //判断当前温度值的大小
    if (temp > acc.highestTemp) {
      acc.secondTemp = acc.highestTemp
      acc.highestTemp = temp
    } else if (temp > acc.secondTemp) {
      acc.secondTemp = temp
    }
  }

  //实现一个输出结果的方法,全部数据处理好后,才会输出
  def emitValue(acc: Top2TempAcc, out: Collector[(Double, Int)]): Unit = {
    out.collect(acc.highestTemp, 1)
    out.collect(acc.secondTemp, 2)
  }
}

只能在Table中实现,SQL中不能使用

    val top2Temp = new Top2Temp()
    val resultTable = sensorTable
      .groupBy('id)
      .flatAggregate(top2Temp('temperature) as('temp, 'rank))
      .select('id, 'temp, 'rank)

image-20220331154535652

跳转顶部

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-04 12:18:09  更:2022-04-04 12:20:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:57:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码