IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 函数 -> 正文阅读

[大数据]Flink 函数

函数

一、系统内置函数

  • 比较函数:
    SQL:
    value1 = value2
    value1 > value2

    Table API:
    ANY1 === ANY2
    ANY1 > ANY2

  • 逻辑函数:
    SQL:
    boolean1 OR boolean2
    boolean IS FALSE
    NOT boolean

    Table API:
    BOOLEAN1 || BOOLEAN2
    BOOLEAN.isFalse
    !BOOLEAN

  • 算数函数:
    SQL:
    numeric1 + numeric2
    POWER(numeric1, numeric2)

    Table API:
    NUMERIC1 + NUMERIC2
    NUMERIC1.power(NUMERIC2)

  • 字符串函数:
    SQL:
    string1 || string2
    UPPER(string)
    CHAR_LENGTH(string)

    Table API:
    STRING1 + STRING2
    STRING.upperCase()
    STRING.charLength()

  • 聚合函数:
    SQL:
    COUNT(*)
    SUM([ ALL | DISTINCT ] expression)
    RANK()
    ROW_NUMBER()

    Table API:
    FIELD.count
    FIELD.sum0

  • 时间函数:
    SQL:
    DATE string
    TIMESTAMP string
    CURRENT_TIME
    INTERVAL string range

    Table API:
    STRING.toDate
    STRING.toTimestamp
    currentTime()
    NUMERIC.days
    NUMERIC.minutes

二、用户自定义函数 UDF

他们显著的扩展了查询的表达能力,一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现;

1.聚合函数

用户自定义的聚合函数,可以将一个表中的数据,聚合成一个标量值。是通过继承 AggregateFunction 抽象类实现的:
AggregateFunction 的工作原理如下。

  • 首先,他需要一个累加器,用来保存聚合中间结果的数据结构(状态),可以通过调用AggregateFunction 的 createAccumulator()创建空的累加器;
  • 随后,对每一个输入行调用函数的 accumulate()方法 来更新累加器;
  • 处理完所有行之后,将调用 getValue()方法来计算并返回最终结果。
    AggregateFunction 要求必须实现的方法:
    ? createAccumulator()
    ? accumulate()
    ? getValue()

除开上述方法之后,还有一些可以实现的方法,可以让系统执行查询更有效率,而另外一些方法,对于某些场景是必须的,比如,聚合函数应该在应用窗口的上下文中,则需要实现merge()方法。

? retract()
? merge()
? resetAccumulator()

我们写一个自定义 AggregateFunction,计算一下每个 sensor 的平均温度值。

// 定义 AggregateFunction 的 Accumulator
public static class AvgTempAcc {
 double sum = 0.0;
 int count = 0;
}
// 自定义一个聚合函数,求每个传感器的平均温度值,保存状态(tempSum, tempCount)
public static class AvgTemp extends AggregateFunction<Double, AvgTempAcc>{
 @Override
 public Double getValue(AvgTempAcc accumulator) {
 return accumulator.sum / accumulator.count;
 }
 @Override
 public AvgTempAcc createAccumulator() {
 return new AvgTempAcc();
 }
 // 实现一个具体的处理计算函数,accumulate
 public void accumulate( AvgTempAcc accumulator, Double temp) {
 accumulator.sum += temp;
 accumulator.count += 1;
 }
}

2.表聚合函数(Table Aggregate Functions)

用户自定义表聚合函数,可以将表中的数据,聚合成具有多行和多列的结果表,这跟AggregateFunction 比较类似,只是之前聚合的结果是一个标量值,现在变成了一张表;
用户定义的表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。

TableAggregateFunction的工作原理如下:

  • 首先,他需要一个累加器( Accumulator )他是保存聚合中间结果 数据结构;通过调用TableAggregateFunction 的 createAccumulator() 方法可以创建空累加器;
  • 随后,对每一个输入行调用函数 accumulate() 方法来更新累嘉器;
  • 处理完所有行后,将调用函数 emitValue() 方法来计算并且返回最终的结果;
    AggregationFunction 要求必须实现的方法:
    ? createAccumulator()
    ? accumulate()
    除了上述方法之外,还有一些可选择实现的方法。
    ? retract()
    ? merge()
    ? resetAccumulator()
    ? emitValue()
    ? emitUpdateWithRetract()
// 先定义一个 Accumulator
public static class Top2TempAcc {
 double highestTemp = Double.MIN_VALUE;
 double secondHighestTemp = Double.MIN_VALUE;
}
// 自定义表聚合函数
public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, 
Integer>, Top2TempAcc> {
 @Override
 public Top2TempAcc createAccumulator() {
 return new Top2TempAcc();
 }
 // 实现计算聚合结果的函数 accumulate
 public void accumulate(Top2TempAcc acc, Double temp) {
 if (temp > acc.highestTemp) {
 acc.secondHighestTemp = acc.highestTemp;
 acc.highestTemp = temp;
 } else if (temp > acc.secondHighestTemp) {
 acc.secondHighestTemp = temp;
 }
 }
 // 实现一个输出结果的方法,最终处理完表中所有数据时调用
 public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> 
out) {
 out.collect(new Tuple2<>(acc.highestTemp, 1));
 out.collect(new Tuple2<>(acc.secondHighestTemp, 2));
 }
}

2.窗口(Windows)

时间语义,要配合窗口才可以发挥作用,最主要的用途:就是开窗口,根据时间字段做计算。
在 Table API 和 SQL 中,有两种窗口:Group Windows 和 Over Windows。

2.1 分组窗口(Group Windows)

分组窗口(Group Windows)将会根据 时间或者行计数间隔,将行聚合到有限的组(Group)中,对每个组的数据执行一次聚合函数;
Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由as 子句指定一个 别名,为了按照窗口对表进行分组,窗口的别名必须在group by子句中,向常规的分组字段一样引用;
Table API 提供了 一组具有特定语义的的预定义 Window 类 ,这些类将会被转换为底层 DataStream 或 DataSet 的窗口操作。Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding)和会话(Session)。

2.1 滚动窗口(Tumbling windows)

滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法,
over: 定义窗口长度
on: 用分组(按照时间间隔)或者排序(按照行数)的时间字段
as:别名,必须出现哎后面的group 中

// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
// Tumbling Processing-time Window
.window(Tumble.over("10.minutes").on("proctime").as("w"))
// Tumbling Row-count Window
.window(Tumble.over("10.rows").on("proctime").as("w")

2.1 滑动窗口(Sliding windows)

滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法,
over: 定义窗口长度
every:定义滑动步长
on: 用分组(按照时间间隔)或者排序(按照行数)的时间字段
as:别名,必须出现哎后面的group 中

// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
// Sliding Processing-time window 
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
// Sliding Row-count window
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))

2.2 Over Windows

Over Windows 聚合标准 SQL 中已有 的(Over 子句),可以查询 select 子句 中定义,Over window 聚合,

// Session Event-time Window
.window(Session.withGap.("10.minutes").on("rowtime").as("w"))
// Session Processing-time Window
.window(Session.withGap.("10.minutes").on(“proctime").as("w"))
  • 首先,他需要一个累加器( Accumulator )他是保存聚合中间结果 数据结构;通过调用TableAggregateFunction 的 createAccumulator() 方法可以创建空累加器;
  • 随后,对每一个输入行调用函数 accumulate() 方法来更新累嘉器;
  • 处理完所有行后,将调用函数 emitValue() 方法来计算并且返回最终的结果;
    AggregationFunction 要求必须实现的方法:
    ? createAccumulator()
    ? accumulate()
    除了上述方法之外,还有一些可选择实现的方法。
    ? retract()
    ? merge()
    ? resetAccumulator()
    ? emitValue()
    ? emitUpdateWithRetract()
// 先定义一个 Accumulator
public static class Top2TempAcc {
 double highestTemp = Double.MIN_VALUE;
 double secondHighestTemp = Double.MIN_VALUE;
}
// 自定义表聚合函数
public static class Top2Temp extends TableAggregateFunction<Tuple2<Double, 
Integer>, Top2TempAcc> {
 @Override
 public Top2TempAcc createAccumulator() {
 return new Top2TempAcc();
 }
 // 实现计算聚合结果的函数 accumulate
 public void accumulate(Top2TempAcc acc, Double temp) {
 if (temp > acc.highestTemp) {
 acc.secondHighestTemp = acc.highestTemp;
 acc.highestTemp = temp;
 } else if (temp > acc.secondHighestTemp) {
 acc.secondHighestTemp = temp;
 }
 }
 // 实现一个输出结果的方法,最终处理完表中所有数据时调用
 public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>> 
out) {
 out.collect(new Tuple2<>(acc.highestTemp, 1));
 out.collect(new Tuple2<>(acc.secondHighestTemp, 2));
 }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:20:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:23:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码