Flink 函数
相关博客:
Flink-函数 | 用户自定义函数(UDF)标量函数 | 表函数 | 聚合函数 | 表聚合函数
一、Flink Table API 和 SQL 内置函数
Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。
SQL中支持的很多函数,Table API 和 SQL都已经做了实现
-
比较函数
-
SQL: value1 = value2 value1 > value2 -
Table API ANY1 === ANY2 ANY1 > ANY2 -
逻辑函数
-
SQL: boolean1= boolean2 boolean IS FALSE NOT boolean boolean IS FALSE NOT boolean -
Table API BOOLEAN1 || BOOLEAN2 BOOLEAN.isFalse !BOOLEAN -
算数函数
-
字符串函数
-
SQL: string1 + string2 UPPER(string) CHAR_LENGTH(string) -
Table API STRING1 + STRING2 STRING.upperCase() STRING.charLength() -
时间函数
-
SQL: DATE string TIMESTAMP string CURRENT_TIME interval string range -
Table API STRING.toDate STRING.toTimestamp currentTime() NUMERIC.days -
聚合函数
-
SQL: COUNT() SUM(expression) RANK() ROW_NUMBER() -
Table API FIELD.count FIELD.sum
二、用户自定义函数(UDF)
用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力.
一些系统内置函数无法解决的需求,可以用UDF来自定义实现
在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用
函数通过调用 registerFunction() 方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它。
sql函数有两大类型:
- scalar Function类似于map,一对一
- Table Function类似与flatMap,一对多
2.1 标量函数(Scalar Functions)
定义标量函数,可以将0、1或多个标量值,映射到新的标量值
为了定义标量函数,必须扩展基类ScalarFunction ,并实现求值(eval)方法。
标量函数的行为由求值方法决定,求值方法必须public公开声明并命名为eval
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;
public class UDFTest1_Scalar {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] field = line.split(",");
return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
});
Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
HashCode hashCode = new HashCode(23);
tableEnv.registerFunction("hashcode",hashCode);
Table resultTable = table.select("id, ts, hashcode(id)");
tableEnv.createTemporaryView("sensor",table);
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashcode(id) from sensor");
tableEnv.toAppendStream(resultTable, Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();
}
public static class HashCode **extends ScalarFunction**{
private int factor = 13;
public HashCode(int factor){
this.factor = factor;
}
public int eval(String id){
return id.hashCode() * 13;
}
}
}
2.2 表函数(Table Function)
用户定义的表函数,也可以将0、1或多个标量值作为输入参数,与标量函数不同,它可以返回任意数量的行作为输出,而不是单个值。
为了定义一个表函数,必须扩展TableFunction并实现求值方法。
同样的,表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
public class UDFTest2_Table {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Spilt split = new Spilt("_");
tableEnv.registerFunction("split",split);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
tableEnv.createTemporaryView("sensor",table);
Table resultTable = table.joinLateral("split(id) as (word, length)").select("id, ts, word, length");
Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " +
"from sensor, lateral table(split(id)) as splitid(word, length)");
tableEnv.toAppendStream(resultTable, Row.class).print("res");
tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");
env.execute();
}
public static class Spilt extends TableFunction<Tuple2<String,Integer>>{
public String sep = ",";
public Spilt(String sep){
this.sep = sep;
}
public void eval(String str){
for (String s : str.split(sep)) {
collect(new Tuple2<>(s,s.length()));
}
}
}
}
2.3 聚合函数(Aggregate Function)
聚合,多对一,类似前面的窗口聚合
用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。
用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的。
- AggregationFunction要求必须实现的方法
- createAccumulator
- accumulate
- getValue
- AggregationFunction的工作原理如下
- 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用
createAccumulator() 方法初始化累加器。 - 随后,对每一个输入行调用函数的
accumulate() 方法来更新累加器。 - 处理完所有行后,将调用
getValue() 方法来计算并返回最终结果。
例如:
package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
public class UDFTest3_Aggra {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
AvgTemp avgTemp = new AvgTemp();
tableEnv.registerFunction("avgTemp",avgTemp);
Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
tableEnv.createTemporaryView("sensor",table);
Table result = table.groupBy("id").select("id, avgTemp(temp)");
Table result2 = table
.groupBy("id")
.aggregate("avgTemp(temp) as avgtemp")
.select("id, avgtemp");
Table resultSql = tableEnv.sqlQuery("select id,avgTemp(temp) from sensor group by id");
tableEnv.toRetractStream(result2, Row.class).print("res");
tableEnv.toRetractStream(resultSql,Row.class).print("sql");
env.execute();
}
public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double,Integer>>{
@Override
public Double getValue(Tuple2<Double, Integer> value) {
return value.f0/value.f1;
}
@Override
public Tuple2<Double, Integer> createAccumulator() {
return new Tuple2<>(0.0,0);
}
public void accumulate(Tuple2<Double,Integer> acc, Double value){
acc.f0 += value;
acc.f1 += 1;
}
}
}
2.4 表聚合函数(Table Function)
用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。
用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。
-
AggregationFunction 要求必须实现的方法:
createAccumulator() accumulate() emitValue() -
TableAggregateFunction 的工作原理如下:
- 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用
createAccumulator() 方法可以创建空累加器。 - 随后,对每个输入行调用函数的
accumulate() 方法来更新累加器。 - 处理完所有行后,将调用函数的
emitValue() 方法来计算并返回最终结果。 例如: package com.root.udf;
import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class UDFTest4_TableAgg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {
@Override
public long extractTimestamp(SensorReading value) {
return value.getTimestamp() * 1000L;
}
});
MyAggTable myAggTable = new MyAggTable();
tableEnv.registerFunction("myAgg",myAggTable);
Table table = tableEnv.fromDataStream(dataStream, "id, temperature as temp, timestamp.rowtime as ts");
tableEnv.createTemporaryView("sensor",table);
Table result = table.groupBy("id")
.flatAggregate("myAgg(temp) as (temp, rank)")
.select("id, temp, rank");
tableEnv.toRetractStream(result, Row.class).print("res");
env.execute();
}
public static class AggTabTempAcc{
public Double highestTemp;
public Double secondHighestTemp;
public AggTabTempAcc(){
highestTemp = Double.MIN_VALUE;
secondHighestTemp = Double.MIN_VALUE;
}
}
public static class MyAggTable extends TableAggregateFunction<Tuple2<Double,Integer>,AggTabTempAcc>{
@Override
public AggTabTempAcc createAccumulator() {
return new AggTabTempAcc();
}
public void accumulate(AggTabTempAcc acc,Double temp){
if (temp > acc.highestTemp){
acc.secondHighestTemp = acc.highestTemp;
acc.highestTemp = temp;
} else if (temp > acc.secondHighestTemp){
acc.secondHighestTemp = temp;
}
}
public void emitValue(AggTabTempAcc acc, Collector<Tuple2<Double, Integer>> col){
col.collect(new Tuple2<>(acc.highestTemp,1));
col.collect(new Tuple2<>(acc.secondHighestTemp,2));
}
}
}
|