0. 程序流程
- 创建表环境
- 创建表,用于输入输出
- 创建输入表,连接外部系统读取数据
- 注册输出表,连接到外部系统,用于输出
- 对数据进行表的查询处理
- 执行 SQL 对表进行查询转换,得到一个新的表
- 使用 Table API 对表进行查询转换,得到一个新的表
- 将得到的结果写入输出表
1. 创建表环境
要想执行 Table API 和 SQL 的话,还需要 “表环境”(TableEnvironment)。它主要负责:
- 注册 Catalog 和表;
- 执行 SQL 查询;
- 注册用户自定义函数(UDF);
- DataStream 和表之间的转换
如下四种创建表环境方法:
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
2. 创建表
连接器表(Connector Tables)
Table API Connectors
tableEnv.executeSql("CREATE [TEMPORARY] TABLE MyTable ... WITH ( 'connector' = ... )");
连接Kafka示例:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
虚拟表(Virtual Tables)
Table newTable = tableEnv.sqlQuery("SELECT ... FROM MyTable... ");
tableEnv.createTemporaryView("NewTable", newTable);
3. 表的查询
执行 SQL 进行查询
- Flink 基于 Apache Calcite 来提供对SQL 的支持,Calcite 是一个为不同的计算平台提供标准 SQL 查询的底层工具,很多大数据框架比如 Apache Hive、Apache Kylin 中的 SQL 支持都是通过集成 Calcite 来实现的
Table table = tableEnvironment.sqlQuery("select * from myTabel");
调用 Table API 进行查询
Table table = ...
Table tableApi = table
.select($("name"), $("time"))
.where($("name").isEqual("fzk"));
4. 输出表
tableEnv.executeSql("CREATE TABLE OutputTable ... WITH ( 'connector' = ... )");
Table result = ...
result.executeInsert("OutputTable");
5. 表和流的相互转换
流转换成表(fromDataStream)
调用 fromDataStream 方法
- ??:新增调用 fromDataStream 方法即可,但 更新日志流转换成表调用 fromChangelogStream 方法
Table table = StreamTableEnvironment.fromDataStream(DataStream<T> var1);
Table table = StreamTableEnvironment.fromDataStream(DataStream<T> var1, Expression... var2);
Table table = tableEnvironment.fromDataStream(sourceData);
Table table = tableEnvironment.fromDataStream(sourceData, $("f0").as("name"), $("f1").as("time"));
调用 fromChangelogStream ()方法
- ??:更新日志流转换成表调用 fromChangelogStream 方法
Table table = tableEnvironment.fromChangelogStream(sourceData);
调用 createTemporaryView() 方法
tableEnvironment.createTemporaryView("myTabel", sourceData, $("f0").as("name"), $("f1").as("time"));
表转换成流
调用 toDataStream() 方法
- ??:如果数据只是新增可以从此方法,如果有更新的操作则需要 调用 toChangelogStream() 方法
DataStream<Row> rowDataStream = tableEnvironment.toDataStream(table);
调用 toChangelogStream() 方法
- ??:有更新的操作则 调用 toChangelogStream() 方法
DataStream<Row> rowDataStream = tableEnvironment.toChangelogStream(table);
6. 数据格式
Data Type | Remarks for Data Type |
---|
CHAR | | VARCHAR | | STRING | | BOOLEAN | | BYTES | BINARY and VARBINARY are not supported yet. | DECIMAL | Supports fixed precision and scale. | TINYINT | | SMALLINT | | INTEGER | | BIGINT | | FLOAT | | DOUBLE | | DATE | | TIME | Supports only a precision of 0 . | TIMESTAMP | | TIMESTAMP_LTZ | | INTERVAL | Supports only interval of MONTH and SECOND(3) . | ARRAY | | MULTISET | | MAP | | ROW | | RAW | | structured types | Only exposed in user-defined functions yet. | Tuple | | POJO | |
7. 时间属性和窗口
时间属性
事件时间
DDL方式定义
- ??:时间时间的数据类型: TIMESTAMP 或者 TIMESTAMP_LTZ
- TIMESTAMP :年-月-日-时-分-秒(2020-04-15 20:13:40.564)
- TIMESTAMP_LTZ :带有本地时区信息的时间戳(TIMESTAMP WITH LOCAL TIME ZONE)
- ??:水位线的数据类型: TIMESTAMP 或者 TIMESTAMP_LTZ
TIMESTAMP 格式的时间:直接使用
CREATE TABLE EventTable(
user STRING,
url STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
...
);
长整型的时间格式的时间:需要先转成 TIMESTAMP_LTZ 格式数据
CREATE TABLE events (
user STRING,
url STRING,
ts BIGINT,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
...
);
数据流转换成表中的定义
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));
处理时间
DDL方式定义
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time AS PROCTIME()
) WITH (
...
);
数据流转换成表中的定义
DataStream<Tuple2<String, String>> stream = ...;
Table table = tEnv.fromDataStream(stream, $("user"), $("url"), $("ts").proctime());
窗口(Window)
分组窗口(Group Window,1.13之前)
Tumble (滚动窗口)
table.window(Tumble.over(lit(10).minutes()).on($("rowtime")).as("w"));
table.window(Tumble.over(lit(10).minutes()).on($("proctime")).as("w"));
table.window(Tumble.over(rowInterval(10)).on($("proctime")).as("w"));
Slide (滑动窗口)
table.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("rowtime"))
.as("w"));
table.window(Slide.over(lit(10).minutes())
.every(lit(5).minutes())
.on($("proctime"))
.as("w"));
table.window(Slide.over(rowInterval(10)).every(rowInterval(5)).on($("proctime")).as("w"));
Session (回话窗口)
table.window(Session.withGap(lit(10).minutes()).on($("rowtime")).as("w"));
table.window(Session.withGap(lit(10).minutes()).on($("proctime")).as("w"));
窗口表值函数(Windowing TVFs,1.13及以后)
使用窗口函数示例
- 使用窗口函数会多几三个参数:
- window_start
- window_end
- window_time
Flink SQL> desc Bid;
+
| name | type | null | key | extras | watermark |
+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+
Flink SQL> SELECT * FROM Bid;
+
| bidtime | price | item |
+
| 2020-04-15 08:05 | 4.00 | C |
| 2020-04-15 08:07 | 2.00 | A |
| 2020-04-15 08:09 | 5.00 | D |
| 2020-04-15 08:11 | 3.00 | B |
| 2020-04-15 08:13 | 1.00 | E |
| 2020-04-15 08:17 | 6.00 | F |
+
Flink SQL> SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES));
+
| bidtime | price | item | window_start | window_end | window_time |
+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+
TUMBLE(滚动窗口)
SELECT * FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES)
);
+
| bidtime | price | item | window_start | window_end | window_time |
+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+
HOP(滑动窗口)
SELECT * FROM TABLE(
HOP(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
);
+
| bidtime | price | item | window_start | window_end | window_time |
+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:05 | 2020-04-15 08:15 | 2020-04-15 08:14:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:15 | 2020-04-15 08:25 | 2020-04-15 08:24:59.999 |
+
CUMULATE(累加窗口)
SELECT * FROM TABLE(
CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
);
+
| bidtime | price | item | window_start | window_end | window_time |
+
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:05 | 4.00 | C | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:08 | 2020-04-15 08:07:59.999 |
| 2020-04-15 08:07 | 2.00 | A | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:09 | 5.00 | D | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:12 | 2020-04-15 08:11:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:11 | 3.00 | B | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:14 | 2020-04-15 08:13:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:16 | 2020-04-15 08:15:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:13 | 1.00 | E | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:18 | 2020-04-15 08:17:59.999 |
| 2020-04-15 08:17 | 6.00 | F | 2020-04-15 08:10 | 2020-04-15 08:20 | 2020-04-15 08:19:59.999 |
+
8、聚合查询
分组聚合
窗口聚合
-
通过开窗的字段值(window_start、window_end、window_time)进行聚合
SELECT window_start, window_end, SUM(price)
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end;
Over聚合
-
在标准 SQL 中还有另外一类比较特殊的聚合方式,可以针对每一行计算一个聚合值。比如说,我们可以以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。就好像是在每一行上打开了一扇窗户、收集数据进行统计一样,这就是所谓的“开窗函数” -
开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系 -
用法: SELECT
<聚合函数> OVER (
[PARTITION BY <字段 1>[, <字段 2>, ...]]
ORDER BY <时间属性字段>
<开窗范围>),
FROM ...
9. 函数
把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这就是“函数”(functions)
内置函数
flink官网内置函数列举
自定义函数
整体使用
当内置函数中的功能满足不了我们的实际业务需求时,我们就需要自己来编写UDF函数来实现业务场景,下面是整体的使用流程:
- 编写 自定义函数(见下面的 函数使用说明)
- 注册函数
- 函数调用
- 使用 Table API 调用函数,需要使用 call()方法来调用自定义函数
- 在 SQL 中调用函数
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class);
tableEnv.from("MyTable").select(call("MyFunction", $("myField")));
tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable");
函数使用说明
- 当前 UDF主要有以下几类:
- 标量函数(Scalar Functions):把 0 到多个标量值映射成 1 个标量值
- 表函数(Table Functions):把 0 到多个标量值映射成一个表(一个表:n行n列)
- 聚合函数(Aggregate Functions):把 一个表转换成一个新的标量值(一个表:n行n列)
- 表聚合函数(Table Aggregate Functions):把 一个表 转换成 另一个表(一个表:n行n列)
标量函数(Scalar Functions)
把 0 到多个标量值映射成 1 个标量值
- 继承 ScalarFunction
- 编写 eval 方法,方法必须是 public 的
public static class HashFunction extends ScalarFunction {
public int eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o) {
return o.hashCode();
}
}
env.createTemporarySystemFunction("HashFunction", HashFunction.class);
env.from("MyTable").select(call("HashFunction", $("myField")));
env.sqlQuery("SELECT HashFunction(myField) FROM MyTable");
表函数(Table Functions)
把 0 到多个标量值映射成一个表(一个表:n行n列)
编写
- 继承 TableFunction
- 编写 eval 方法,方法必须是 public 的
使用
- 在 Table API 中,表值函数是通过
.joinLateral(...) 或者 .leftOuterJoinLateral(...) 来使用的。
joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。 - 在 SQL 里面用
JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE(<TableFunction>) 的使用
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
public static class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
collect(Row.of(s, s.length()));
}
}
}
env.createTemporarySystemFunction("SplitFunction", SplitFunction.class);
env
.from("MyTable")
.joinLateral(call("SplitFunction", $("myField")))
.select($("myField"), $("word"), $("length"));
env
.from("MyTable")
.leftOuterJoinLateral(call("SplitFunction", $("myField")))
.select($("myField"), $("word"), $("length"));
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable, LATERAL TABLE(SplitFunction(myField))");
env.sqlQuery(
"SELECT myField, word, length " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE");
env.sqlQuery(
"SELECT myField, newWord, newLength " +
"FROM MyTable " +
"LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON TRUE");
聚合函数(Aggregate Functions)
把 一个表转换成一个新的标量值(一个表:n行n列)
- 继承 AggregateFunction
- 必须实现的方法
createAccumulator() :创建一个空的 accumulatoraccumulate() :对于每一行数据,会调用 accumulate() 方法来更新 accumulatorgetValue() :当所有的数据都处理完了之后,通过调用 getValue 方法来计算和返回最终的结果 - 在某些场景下是必须实现
retract() :在 bounded OVER 窗口中是必须实现的,回退 accumulatormerge() :在许多批式聚合和会话以及滚动窗口聚合中是必须实现的。除此之外,这个方法对于优化也很多帮助。例如,两阶段聚合优化就需要所有的 AggregateFunction 都实现 merge 方法。resetAccumulator() :在许多批式聚合中是必须实现的,重置 accumulator
public static class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}
@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}
public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}
tEnv.registerFunction("wAvg", new WeightedAvg());
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
表聚合函数(Table Aggregate Functions)
把 一个表 转换成 另一个表(一个表:n行n列)
- 继承 TableAggregateFunction
- 必须要实现的方法
createAccumulator() :创建一个空的 accumulatoraccumulate() :对于每一行数据,会调用 accumulate() 方法来更新 accumulatoremitValue() :计算和返回最终的结果 - 在某些场景下是必须实现
retract() 在 bounded OVER 窗口中的聚合函数必须要实现,回退 accumulatormerge() 在许多批式聚合和以及流式会话和滑动窗口聚合中是必须要实现的resetAccumulator() 在许多批式聚合中是必须要实现的,重置 accumulator
public class Top2Accum {
public Integer first;
public Integer second;
}
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
tEnv.registerFunction("top2", new Top2());
tab.groupBy("key")
.flatAggregate("top2(a) as (v, rank)")
.select("key, v, rank");
Maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
|