FlinkSQL-自定义表聚合函数TableAggregateFunction
什么是表聚合函数
表聚合,多对多,多行输入多行输出
- 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAF),可以把一个表中数据,聚合为具有多行和多列的结果表
- 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的
表聚合函数的实现
TableAggregateFunction 要求必须实现的方法
createAccumulator() accumulate() emitValue()
表聚合函数的工作原理
- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用
createAccumulator() 方法可以创建空累加器 - 随后,对每个输入行调用函数的
accumulate() 方法来更新累加器 - 处理完所有行后,将调用函数的
emitValue() 方法来计算并返回最终结果
代码实现
public static class Top2 extends TableAggregateFunction<Tuple2<Double, Integer>, Tuple2<Double,Double>> {
public void emitValue(Tuple2<Double, Double> acc, Collector<Tuple2<Double, Integer>> out) {
if (acc.f0 != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.f0, 1));
}
if (acc.f1 != Double.MIN_VALUE) {
out.collect(Tuple2.of(acc.f1, 2));
}
}
@Override
public Tuple2<Double, Double> createAccumulator() {
return new Tuple2<Double,Double>(Double.MIN_VALUE,Double.MIN_VALUE);
}
public void accumulate(Tuple2<Double,Double> acc, Double temp){
if (temp > acc.f0) {
acc.f1 = acc.f0;
acc.f0 = temp;
} else if (temp > acc.f1) {
acc.f1 = temp;
}
}
}
测试用例
Table sourceTable = tableEnv.fromDataStream(dataStream, "f0 as id, f1 as ts, f2 as temp,pt.proctime");
Top2 top2 = new Top2();
tableEnv.registerFunction("top2",top2);
Table resultTable = sourceTable.groupBy("id")
.flatAggregate("top2(temp) as (temp,rank)")
.select("id,temp,rank");
tableEnv.toRetractStream(resultTable, Row.class).print();
|