Blink 开发Java自定义函数(UDX)
概述
注意事项
- UDX函数仅适用于Blink,对开源Flink暂不适用。
- 为了避免JAR依赖冲突,您需要注意以下几点:
- 开发页面选择的Blink版本,请和Pom依赖Blink版本保持一致。
- Blink相关依赖,scope请使用provided,即
<scope>provided</scope> - 其他第三方依赖请采用Shade方式打包
UDX分类
实时计算Flink版支持以下3类自定义函数。
UDX分类 | 描述 |
---|
UDF(User Defined Scalar Function) | 用户自定义标量值函数,其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。 | UDAF(User Defined Aggregation Function) | 自定义聚合函数,其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。 | UDTF(User Defined Table-valued Function) | 自定义表值函数,调用一次函数输出多行或多列数据。 |
注册使用
-
在资源引用中新建开发完成的jar包; -
在作业的编辑窗口的顶部,输入自定义函数声明,示例如下。 CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
参数与返回值类型
实时计算Flink版数据类型 | Java类型 |
---|
TINYINT | java.lang.Byte | SMALLINT | java.lang.Short | INT | java.lang.Integer | BIGINT | java.lang.Long | FLOAT | java.lang.Float | DOUBLE | java.lang.Double | DECIMAL | java.math.BigDecimal | BOOLEAN | java.lang.Boolean | DATE | java.sql.Date | TIME | java.sql.Time | TIMESTAMP | java.sql.Timestamp | CHAR | java.lang.Character | STRING | java.lang.String | VARBINARY | java.lang.byte[] | ARRAY | 暂不支持 | MAP | 暂不支持 |
获取自定义函数参数
自定义函数中提供了可选的open(FunctionContext context) 方法,FunctionContext 具备参数传递功能,自定义配置项可以通过此对象来传递。
假设,您需要在作业中添加如下两个参数。
testKey1=lincoln
test.key2=todd
以UDTF为例,在Open方法中通过context.getJobParameter 即可获取,示例如下。
public void open(FunctionContext context) throws Exception {
String key1 = context.getJobParameter("testKey1", "empty");
String key2 = context.getJobParameter("test.key2", "empty");
System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}
自定义标量函数(UDF)
定义
自定义标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。
业务代码
UDF需要在ScalarFunction类中实现eval 方法。open 方法和close 方法可选。
注意 UDF默认对于相同的输入会有相同的输出。如果UDF不能保证相同的输出,例如,在UDF中调用外部服务,相同的输入值可能返回不同的结果,建议您使用override isDeterministic() 方法,返回False 。否则在某些条件下,输出结果不符合预期。例如,UDF算子前移。
示例代码如下。
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;
public class StringLengthUdf extends ScalarFunction {
@Override
public void open(FunctionContext context) {
}
public long eval(String a) {
return a == null ? 0 : a.length();
}
public long eval(String b, String c) {
return eval(b) + eval(c);
}
@Override
public void close() {
}
}
编写SQL语句
在指定的Class中编写SQL语句,自定义函数中SQL语句示例如下。
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
create table sls_stream(
a int,
b int,
c varchar
) with (
type='sls',
endPoint='<yourEndpoint>',
accessKeyId='<yourAccessId>',
accessKeySecret='<yourAccessSecret>',
startTime = '2017-07-04 00:00:00',
project='<yourProjectName>',
logStore='<yourLogStoreName>',
consumerGroup='consumerGroupTest1'
);
create table rds_output(
id int,
len bigint,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
insert into rds_output
select
a,
stringLengthUdf(c),
c as content
from sls_stream;
自定义聚合函数(UDAF)
定义
自定义聚合函数(UDAF)可以将多条记录聚合成1条记录。
UDAF抽象类内部方法
AggregateFunction的核心接口方法,如下所示:
- createAccumulator和getValue方法
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
public ACC createAccumulator();
public T getValue(ACC accumulator);
}
说明
- createAccumulator和getValue可以定义在AggregateFunction抽象类内。
- UDAF必须包含1个accumulate方法。
-
accumulate方法 public void accumulate(ACC accumulator, ...[用户指定的输入参数]...);
说明
- 您需要实现一个accumulate方法,来描述如何计算输入的数据,并更新数据到accumulator中。
- accumulate方法的第一个参数必须是使用AggregateFunction的ACC类型的accumulator。在系统运行过程中,runtime代码会把accumulator的历史状态和您指定的上游数据(支持任意数量,任意类型的数据)作为参数,一起传递给accumulate方法。
-
retract和merge方法 createAccumulator、getValue和accumulate 3个方法一起使用,可以设计出一个最基本的UDAF。但是实时计算Flink版一些特殊的场景需要您提供retract和merge两个方法才能完成。 通常,计算都是对无限流的一个提前的观测值(early firing)。既然有early firing,就会有对发出的结果的修改,这个操作叫作撤回(retract)。SQL翻译优化器会帮助您自动判断哪些情况下会产生撤回的数据,哪些操作需要处理带有撤回标记的数据。但是您需要实现一个retract方法来处理撤回的数据。 public void retract(ACC accumulator, ...[您指定的输入参数]...);
说明
- retract方法是accumulate方法的逆操作。例如,实现Count功能的UDAF,在使用accumulate方法时,每来一条数据要加1;在使用retract方法时,就要减1。
- 类似于accumulate方法,retract方法的第1个参数必须使用AggregateFunction的ACC类型的accumulator。在系统运行过程中,runtime代码会把accumulator的历史状态,和您指定的上游数据(任意数量,任意类型的数据)一起发送给retract计算。
在实时计算Flink版中一些场景需要使用merge方法,例如session window。由于实时计算Flink版具有out of order的特性,后输入的数据有可能位于2个原本分开的session中间,这样就把2个session合为1个session。此时,需要使用merge方法把多个accumulator合为1个accumulator。 public void merge(ACC accumulator, Iterable<ACC> its);
说明
- merge方法的第1个参数,必须是使用AggregateFunction的ACC类型的accumulator,而且第1个accumulator是merge方法完成之后,状态所存放的地方。
- merge方法的第2个参数是1个ACC类型的accumulator遍历迭代器,里面有可能存在1个或多个accumulator。
编写业务逻辑代码
import org.apache.flink.table.functions.AggregateFunction;
public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
public static class CountAccum {
public long total;
}
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
public Long getValue(CountAccum accumulator) {
return accumulator.total;
}
public void accumulate(CountAccum accumulator, Object iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}
示例
-- UDAF计算count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';
create table sls_stream(
a int,
b bigint,
c varchar
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessId',
accessKeySecret='yourAccessSecret',
startTime='2017-07-04 00:00:00',
project='<yourPorjectName>',
logStore='stream-test2',
consumerGroup='consumerGroupTest3'
);
create table rds_output(
len1 bigint,
len2 bigint
) with (
type='rds',
url='yourDatabaseURL',
tableName='<yourDatabaseTableName>',
userName='<yourDatabaseUserName>',
password='<yourDatabasePassword>'
);
insert into rds_output
select
count(a),
countUdaf(a)
from sls_stream;
自定义表值函数(UDTF)
定义
与自定义的标量函数类似,自定义的表值函数(UDTF)将0个、1个或多个标量值作为输入参数(可以是变长参数)。与标量函数不同,表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。
编写业务逻辑代码
UDTF需要在TableFunction类中实现eval方法。open方法和close方法可选。示例代码如下:
package com.hjc.test.blink.sql.udx;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
public class SplitUdtf extends TableFunction<String> {
@Override
public void open(FunctionContext context) {
}
public void eval(String str) {
String[] split = str.split("\\|");
for (String s : split) {
collect(s);
}
}
@Override
public void close() {
}
}
多行返回
UDTF可以通过多次调用collect() 实现将1行的数据转为多行返回。
多列返回
UDTF不仅可以进行1行转多行,还可以1列转多列。如果您需要UDTF返回多列,只需要将返回值声明成Tuple或Row。Tuple或Row解释如下:
-
返回值为Tuple 实时计算Flink版支持使用Tuple1到Tuple25 ,定义1个字段到25个字段。用Tuple3来返回3个字段的UDTF示例如下。 import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.table.functions.TableFunction;
public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {
public void eval(String str) {
String[] split = str.split(",");
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third);
collect(tuple3);
}
}
说明 使用Tuple时,字段值不能为null,且最多只能存在25个字段。
-
返回值为Row 使用Row来实现返回3个字段的UDTF示例如下。 import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.DataTypes;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
public class ParseUdtf extends TableFunction<Row> {
public void eval(String str) {
String[] split = str.split(",");
String first = split[0];
long second = Long.parseLong(split[1]);
int third = Integer.parseInt(split[2]);
Row row = new Row(3);
row.setField(0, first);
row.setField(1, second);
row.setField(2, third);
collect(row);
}
@Override
public DataType getResultType(Object[] arguments, Class[] argTypes) {
return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
}
}
说明 Row的字段值可以是null,但如果需要使用Row,必须重载实现getResultType 方法。
SQL语法
UDTF支持cross join和left join,在使用UDTF时需要添加lateral 和table 关键字。以ParseUdtf 为例,需要先注册一个Function名字。
CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
-
cross join 左表的每一行数据都会关联上UDTF产出的每一行数据,如果UDTF不产出任何数据,则这1行不会输出。 select S.id, S.content, T.a, T.b, T.c
from input_stream as S,
lateral table(parseUdtf(content)) as T(a, b, c);
-
left join 左表的每一行数据都会关联上UDTF产出的每一行数据,如果UDTF不产出任何数据,则这1行的UDTF的字段会用null值填充。 select S.id, S.content, T.a, T.b, T.c
from input_stream as S
left join lateral table(parseUdtf(content)) as T(a, b, c) on true;
说明 left join UDTF语句后面必须添加on true 参数。
UDTF示例
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';
create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);
create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);
insert into rds_output
select
a,b,s
from v1;
|