函数
一、系统内置函数
-
比较函数: SQL: value1 = value2 value1 > value2 Table API: ANY1 === ANY2 ANY1 > ANY2 -
逻辑函数: SQL: boolean1 OR boolean2 boolean IS FALSE NOT boolean Table API: BOOLEAN1 || BOOLEAN2 BOOLEAN.isFalse !BOOLEAN -
算数函数: SQL: numeric1 + numeric2 POWER(numeric1, numeric2) Table API: NUMERIC1 + NUMERIC2 NUMERIC1.power(NUMERIC2) -
字符串函数: SQL: string1 || string2 UPPER(string) CHAR_LENGTH(string) Table API: STRING1 + STRING2 STRING.upperCase() STRING.charLength() -
聚合函数: SQL: COUNT(*) SUM([ ALL | DISTINCT ] expression) RANK() ROW_NUMBER() Table API: FIELD.count FIELD.sum0 -
时间函数: SQL: DATE string TIMESTAMP string CURRENT_TIME INTERVAL string range Table API: STRING.toDate STRING.toTimestamp currentTime() NUMERIC.days NUMERIC.minutes
二、用户自定义函数 UDF
他们显著的扩展了查询的表达能力,一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现;
1.聚合函数
用户自定义的聚合函数,可以将一个表中的数据,聚合成一个标量值。是通过继承 AggregateFunction 抽象类实现的: AggregateFunction 的工作原理如下。
- 首先,他需要一个累加器,用来保存聚合中间结果的数据结构(状态),可以通过调用AggregateFunction 的 createAccumulator()创建空的累加器;
- 随后,对每一个输入行调用函数的 accumulate()方法 来更新累加器;
- 处理完所有行之后,将调用 getValue()方法来计算并返回最终结果。
AggregateFunction 要求必须实现的方法: ? createAccumulator() ? accumulate() ? getValue()
除开上述方法之后,还有一些可以实现的方法,可以让系统执行查询更有效率,而另外一些方法,对于某些场景是必须的,比如,聚合函数应该在应用窗口的上下文中,则需要实现merge()方法。
? retract()
? merge()
? resetAccumulator()
我们写一个自定义 AggregateFunction,计算一下每个 sensor 的平均温度值。
public static class AvgTempAcc {
double sum = 0.0;
int count = 0;
}
public static class AvgTemp extends AggregateFunction<Double, AvgTempAcc>{
@Override
public Double getValue(AvgTempAcc accumulator) {
return accumulator.sum / accumulator.count;
}
@Override
public AvgTempAcc createAccumulator() {
return new AvgTempAcc();
}
public void accumulate( AvgTempAcc accumulator, Double temp) {
accumulator.sum += temp;
accumulator.count += 1;
}
}
2.表聚合函数(Table Aggregate Functions)
用户自定义表聚合函数,可以将表中的数据,聚合成具有多行和多列的结果表,这跟AggregateFunction 比较类似,只是之前聚合的结果是一个标量值,现在变成了一张表; 用户定义的表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。
TableAggregateFunction的工作原理如下:
- 首先,他需要一个累加器( Accumulator )他是保存聚合中间结果 数据结构;通过调用TableAggregateFunction 的 createAccumulator() 方法可以创建空累加器;
- 随后,对每一个输入行调用函数 accumulate() 方法来更新累嘉器;
- 处理完所有行后,将调用函数 emitValue() 方法来计算并且返回最终的结果;
AggregationFunction 要求必须实现的方法: ? createAccumulator() ? accumulate() 除了上述方法之外,还有一些可选择实现的方法。 ? retract() ? merge() ? resetAccumulator() ? emitValue() ? emitUpdateWithRetract()
public static class Top2TempAcc {
double highestTemp = Double.MIN_VALUE;
double secondHighestTemp = Double.MIN_VALUE;
}
public static class Top2Temp extends TableAggregateFunction<Tuple2<Double,
Integer>, Top2TempAcc> {
@Override
public Top2TempAcc createAccumulator() {
return new Top2TempAcc();
}
public void accumulate(Top2TempAcc acc, Double temp) {
if (temp > acc.highestTemp) {
acc.secondHighestTemp = acc.highestTemp;
acc.highestTemp = temp;
} else if (temp > acc.secondHighestTemp) {
acc.secondHighestTemp = temp;
}
}
public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>>
out) {
out.collect(new Tuple2<>(acc.highestTemp, 1));
out.collect(new Tuple2<>(acc.secondHighestTemp, 2));
}
}
2.窗口(Windows)
时间语义,要配合窗口才可以发挥作用,最主要的用途:就是开窗口,根据时间字段做计算。 在 Table API 和 SQL 中,有两种窗口:Group Windows 和 Over Windows。
2.1 分组窗口(Group Windows)
分组窗口(Group Windows)将会根据 时间或者行计数间隔,将行聚合到有限的组(Group)中,对每个组的数据执行一次聚合函数; Table API 中的 Group Windows 都是使用.window(w:GroupWindow)子句定义的,并且必须由as 子句指定一个 别名,为了按照窗口对表进行分组,窗口的别名必须在group by子句中,向常规的分组字段一样引用; Table API 提供了 一组具有特定语义的的预定义 Window 类 ,这些类将会被转换为底层 DataStream 或 DataSet 的窗口操作。Table API 支持的窗口定义,和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding)和会话(Session)。
2.1 滚动窗口(Tumbling windows)
滚动窗口(Tumbling windows)要用 Tumble 类来定义,另外还有三个方法, over: 定义窗口长度 on: 用分组(按照时间间隔)或者排序(按照行数)的时间字段 as:别名,必须出现哎后面的group 中
.window(Tumble.over("10.minutes").on("rowtime").as("w"))
.window(Tumble.over("10.minutes").on("proctime").as("w"))
.window(Tumble.over("10.rows").on("proctime").as("w")
2.1 滑动窗口(Sliding windows)
滑动窗口(Sliding windows)要用 Slide 类来定义,另外还有四个方法, over: 定义窗口长度 every:定义滑动步长 on: 用分组(按照时间间隔)或者排序(按照行数)的时间字段 as:别名,必须出现哎后面的group 中
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
2.2 Over Windows
Over Windows 聚合标准 SQL 中已有 的(Over 子句),可以查询 select 子句 中定义,Over window 聚合,
.window(Session.withGap.("10.minutes").on("rowtime").as("w"))
.window(Session.withGap.("10.minutes").on(“proctime").as("w"))
- 首先,他需要一个累加器( Accumulator )他是保存聚合中间结果 数据结构;通过调用TableAggregateFunction 的 createAccumulator() 方法可以创建空累加器;
- 随后,对每一个输入行调用函数 accumulate() 方法来更新累嘉器;
- 处理完所有行后,将调用函数 emitValue() 方法来计算并且返回最终的结果;
AggregationFunction 要求必须实现的方法: ? createAccumulator() ? accumulate() 除了上述方法之外,还有一些可选择实现的方法。 ? retract() ? merge() ? resetAccumulator() ? emitValue() ? emitUpdateWithRetract()
public static class Top2TempAcc {
double highestTemp = Double.MIN_VALUE;
double secondHighestTemp = Double.MIN_VALUE;
}
public static class Top2Temp extends TableAggregateFunction<Tuple2<Double,
Integer>, Top2TempAcc> {
@Override
public Top2TempAcc createAccumulator() {
return new Top2TempAcc();
}
public void accumulate(Top2TempAcc acc, Double temp) {
if (temp > acc.highestTemp) {
acc.secondHighestTemp = acc.highestTemp;
acc.highestTemp = temp;
} else if (temp > acc.secondHighestTemp) {
acc.secondHighestTemp = temp;
}
}
public void emitValue(Top2TempAcc acc, Collector<Tuple2<Double, Integer>>
out) {
out.collect(new Tuple2<>(acc.highestTemp, 1));
out.collect(new Tuple2<>(acc.secondHighestTemp, 2));
}
}
|