在Flink中函数的使用,Flink为用户提供了许多内置函数 如果这些函数还满足不了你的需求,还可以自定义函数
内置函数
比较函数
一些简单的常用比较函数
比较功能 | 描述 |
---|
value1 = value2 | 如果value1等于value2,则返回 TRUE ;如果value1或value2为 NULL,则返回 UNKNOWN 。 | value1 <> value2 | 如果value1不等于value2则返回 TRUE ;如果value1或value2为 NULL,则返回 UNKNOWN。 | >,>=,<,<= | 如果value1大于(大于等于,小于,小于等于)value2,则返回 TRUE ;如果value1或value2为 NULL,则返回 UNKNOWN 。 | value1 IS DISTINCT FROM value2 | 如果两个值不相等,则返回 TRUE。NULL 值在此处被视为相同。例如,1 IS DISTINCT FROM NULL返回 TRUE; NULL IS DISTINCT FROM NULL返回 FALSE。 | value is null(is not null) | 如果值为NULL(不为null),则返回 TRUE 。 |
更多参考官网: https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
用户自定义函数
标量函数
用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。 为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。
定义函数
public static class ToUpperCase extends ScalarFunction {
public String eval(String s){
return s.toUpperCase();
}
}
使用函数
TableAPI
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> stream = env.fromElements("hello", "flink", "Hello");
Table table = tEnv.fromDataStream(stream, $("word"));
table.select(call(ToUpperCase.class, $("word")).as("word_upper")).execute().print();
tEnv.createTemporaryFunction("toUpper", ToUpperCase.class);
table.select(call("toUpper", $("word")).as("word_upper")).execute().print();
SQL
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> stream = env.fromElements("hello", "flink", "Hello");
Table table = tEnv.fromDataStream(stream, $("word"));
tEnv.createTemporaryFunction("toUpper", ToUpperCase.class);
tEnv.createTemporaryView("t_word", table);
tEnv.sqlQuery("select toUpper(word) word_upper from t_word").execute().print();
表值函数
跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。 要定义一个表值函数,你需要扩展 org.apache.flink.table.functions 下的 TableFunction,可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 collect(T) 方法来发送要输出的行。 在 Table API 中,表值函数是通过 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 来使用的。joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。 在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE() 的使用。
定义函数
@FunctionHint(output = @DataTypeHint("ROW(word string, len int)"))
public static class Split extends TableFunction<Row> {
public void eval(String line) {
if (line.length() == 0) {
return;
}
for (String s : line.split(",")) {
collect(Row.of(s, s.length()));
}
}
}
Table API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> stream = env.fromElements("hello,flink,world", "aaa,bbbbb", "");
Table table = tEnv.fromDataStream(stream, $("line"));
table
.joinLateral(call(Split.class, $("line")))
.select($("line"), $("word"), $("len"))
.execute()
.print();
table
.leftOuterJoinLateral(call(Split.class, $("line")))
.select($("line"), $("word"), $("len"))
.execute()
.print();
tEnv.createTemporaryFunction("split", Split.class);
table
.joinLateral(call("split", $("line")))
.select($("line"), $("word"), $("len"))
.execute()
.print();
SQL
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> stream = env.fromElements("hello,flink,world", "aaa,bbbbb", "");
Table table = tEnv.fromDataStream(stream, $("line"));
tEnv.createTemporaryView("t_word", table);
tEnv.createTemporaryFunction("split", Split.class);
tEnv.sqlQuery("select " +
" line, word, len " +
"from t_word " +
"join lateral table(split(line)) on true").execute().print();
tEnv.sqlQuery("select " +
" line, word, len " +
"from t_word, " +
"lateral table(split(line))").execute().print();
tEnv.sqlQuery("select " +
" line, word, len " +
"from t_word " +
"left join lateral table(split(line)) on true").execute().print();
tEnv.sqlQuery("select " +
" line, new_word, new_len " +
"from t_word " +
"left join lateral table(split(line)) as T(new_word, new_len) on true").execute().print();
聚合函数
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。
定义函数
public static class VcAvgAcc {
public Integer sum = 0;
public Long count = 0L;
}
public static class VcAvg extends AggregateFunction<Double, VcAvgAcc> {
@Override
public Double getValue(VcAvgAcc accumulator) {
return accumulator.sum * 1.0 / accumulator.count;
}
@Override
public VcAvgAcc createAccumulator() {
return new VcAvgAcc();
}
public void accumulate(VcAvgAcc acc, Integer vc) {
acc.sum += vc;
acc.count += 1L;
}
}
Table API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<WaterSensor> waterSensorStream =
env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
Table table = tEnv.fromDataStream(waterSensorStream);
table
.groupBy($("id"))
.select($("id"), call(VcAvg.class, $("vc")))
.execute()
.print();
tEnv.createTemporaryFunction("my_avg", VcAvg.class);
table
.groupBy($("id"))
.select($("id"), call("my_avg", $("vc")))
.execute()
.print();
SQL
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<WaterSensor> waterSensorStream =
env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
Table table = tEnv.fromDataStream(waterSensorStream);
tEnv.createTemporaryView("t_sensor", table);
tEnv.createTemporaryFunction("my_avg", VcAvg.class);
tEnv.sqlQuery("select id, my_avg(vc) from t_sensor group by id").execute().print();
表值聚合函数
自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。
定义函数
public static class Top2Acc {
public Integer first = Integer.MIN_VALUE;
public Integer second = Integer.MIN_VALUE;
}
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Acc> {
@Override
public Top2Acc createAccumulator() {
return new Top2Acc();
}
public void accumulate(Top2Acc acc, Integer vc) {
if (vc > acc.first) {
acc.second = acc.first;
acc.first = vc;
} else if (vc > acc.second) {
acc.second = vc;
}
}
public void emitValue(Top2Acc acc, Collector<Tuple2<Integer, Integer>> out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
Table API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<WaterSensor> waterSensorStream =
env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
Table table = tEnv.fromDataStream(waterSensorStream);
table
.groupBy($("id"))
.flatAggregate(call(Top2.class, $("vc")).as("v", "rank"))
.select($("id"), $("v"), $("rank"))
.execute()
.print();
tEnv.createTemporaryFunction("top2", Top2.class);
table
.groupBy($("id"))
.flatAggregate(call("top2", $("vc")).as("v", "rank"))
.select($("id"), $("v"), $("rank"))
.execute()
.print();
SQL
SQL 还不支持表值聚合函数
|