IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Blink 自定义函数(UDX) -> 正文阅读

[Java知识库]Blink 自定义函数(UDX)

Blink 开发Java自定义函数(UDX)

概述

注意事项

  • UDX函数仅适用于Blink,对开源Flink暂不适用。
  • 为了避免JAR依赖冲突,您需要注意以下几点:
    • 开发页面选择的Blink版本,请和Pom依赖Blink版本保持一致。
    • Blink相关依赖,scope请使用provided,即<scope>provided</scope>
    • 其他第三方依赖请采用Shade方式打包

UDX分类

实时计算Flink版支持以下3类自定义函数。

UDX分类描述
UDF(User Defined Scalar Function)用户自定义标量值函数,其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
UDAF(User Defined Aggregation Function)自定义聚合函数,其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。
UDTF(User Defined Table-valued Function)自定义表值函数,调用一次函数输出多行或多列数据。

注册使用

  1. 在资源引用中新建开发完成的jar包;

  2. 在作业的编辑窗口的顶部,输入自定义函数声明,示例如下。

    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
    

参数与返回值类型

实时计算Flink版数据类型Java类型
TINYINTjava.lang.Byte
SMALLINTjava.lang.Short
INTjava.lang.Integer
BIGINTjava.lang.Long
FLOATjava.lang.Float
DOUBLEjava.lang.Double
DECIMALjava.math.BigDecimal
BOOLEANjava.lang.Boolean
DATEjava.sql.Date
TIMEjava.sql.Time
TIMESTAMPjava.sql.Timestamp
CHARjava.lang.Character
STRINGjava.lang.String
VARBINARYjava.lang.byte[]
ARRAY暂不支持
MAP暂不支持

获取自定义函数参数

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项可以通过此对象来传递。

假设,您需要在作业中添加如下两个参数。

testKey1=lincoln
test.key2=todd

以UDTF为例,在Open方法中通过context.getJobParameter即可获取,示例如下。

public void open(FunctionContext context) throws Exception {
      String key1 = context.getJobParameter("testKey1", "empty");
      String key2 = context.getJobParameter("test.key2", "empty");
      System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}

自定义标量函数(UDF)

定义

自定义标量函数(UDF)将0个、1个或多个标量值映射到一个新的标量值。

业务代码

UDF需要在ScalarFunction类中实现eval方法。open方法和close方法可选。

注意 UDF默认对于相同的输入会有相同的输出。如果UDF不能保证相同的输出,例如,在UDF中调用外部服务,相同的输入值可能返回不同的结果,建议您使用override isDeterministic()方法,返回False。否则在某些条件下,输出结果不符合预期。例如,UDF算子前移。

示例代码如下。

package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.ScalarFunction;

public class StringLengthUdf extends ScalarFunction {
    // 可选,open方法可以不写。
    // 如果编写open方法需要声明'import org.apache.flink.table.functions.FunctionContext;'。
    @Override
    public void open(FunctionContext context) {
        }
    public long eval(String a) {
        return a == null ? 0 : a.length();
    }
    public long eval(String b, String c) {
        return eval(b) + eval(c);
    }
    //可选,close方法可以不写。
    @Override
    public void close() {
        }
}

编写SQL语句

在指定的Class中编写SQL语句,自定义函数中SQL语句示例如下。

-- udf str.length()
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

create table sls_stream(
    a int,
    b int,
    c varchar
) with (
    type='sls',
    endPoint='<yourEndpoint>',
    accessKeyId='<yourAccessId>',
    accessKeySecret='<yourAccessSecret>',
    startTime = '2017-07-04 00:00:00',
    project='<yourProjectName>',
    logStore='<yourLogStoreName>',
    consumerGroup='consumerGroupTest1'
);

create table rds_output(
    id int,
    len bigint,
    content VARCHAR
) with (
    type='rds',
    url='yourDatabaseURL',
    tableName='<yourDatabaseTableName>',
    userName='<yourDatabaseUserName>',
    password='<yourDatabasePassword>'
);

insert into rds_output
select
    a,
    stringLengthUdf(c),
    c as content
from sls_stream;

自定义聚合函数(UDAF)

定义

自定义聚合函数(UDAF)可以将多条记录聚合成1条记录。

UDAF抽象类内部方法

AggregateFunction的核心接口方法,如下所示:

  • createAccumulator和getValue方法
/*
* @param <T> UDAF的输出结果的类型。
* @param <ACC> UDAF的accumulator的类型。accumulator是UDAF计算中用来存放计算中间结果的数据类型。您可以需要根据需要自行设计每个UDAF的accumulator。
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
/*
* 初始化AggregateFunction的accumulator。
* 系统在进行第一个aggregate计算之前,调用一次此方法。
*/
public ACC createAccumulator()/*
* 系统在每次aggregate计算完成后,调用此方法。
*/
public T getValue(ACC accumulator)}

说明

  • createAccumulator和getValue可以定义在AggregateFunction抽象类内。
  • UDAF必须包含1个accumulate方法。
  • accumulate方法

    public void accumulate(ACC accumulator, ...[用户指定的输入参数]...);
    

    说明

    • 您需要实现一个accumulate方法,来描述如何计算输入的数据,并更新数据到accumulator中。
    • accumulate方法的第一个参数必须是使用AggregateFunction的ACC类型的accumulator。在系统运行过程中,runtime代码会把accumulator的历史状态和您指定的上游数据(支持任意数量,任意类型的数据)作为参数,一起传递给accumulate方法。
  • retract和merge方法

    createAccumulator、getValue和accumulate 3个方法一起使用,可以设计出一个最基本的UDAF。但是实时计算Flink版一些特殊的场景需要您提供retract和merge两个方法才能完成。

    通常,计算都是对无限流的一个提前的观测值(early firing)。既然有early firing,就会有对发出的结果的修改,这个操作叫作撤回(retract)。SQL翻译优化器会帮助您自动判断哪些情况下会产生撤回的数据,哪些操作需要处理带有撤回标记的数据。但是您需要实现一个retract方法来处理撤回的数据。

    public void retract(ACC accumulator, ...[您指定的输入参数]...);
    

    说明

    • retract方法是accumulate方法的逆操作。例如,实现Count功能的UDAF,在使用accumulate方法时,每来一条数据要加1;在使用retract方法时,就要减1。
    • 类似于accumulate方法,retract方法的第1个参数必须使用AggregateFunction的ACC类型的accumulator。在系统运行过程中,runtime代码会把accumulator的历史状态,和您指定的上游数据(任意数量,任意类型的数据)一起发送给retract计算。

    在实时计算Flink版中一些场景需要使用merge方法,例如session window。由于实时计算Flink版具有out of order的特性,后输入的数据有可能位于2个原本分开的session中间,这样就把2个session合为1个session。此时,需要使用merge方法把多个accumulator合为1个accumulator。

    public void merge(ACC accumulator, Iterable<ACC> its);
    

    说明

    • merge方法的第1个参数,必须是使用AggregateFunction的ACC类型的accumulator,而且第1个accumulator是merge方法完成之后,状态所存放的地方。
    • merge方法的第2个参数是1个ACC类型的accumulator遍历迭代器,里面有可能存在1个或多个accumulator。

编写业务逻辑代码

import org.apache.flink.table.functions.AggregateFunction;

public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
    //定义存放count UDAF状态的accumulator的数据的结构。
    public static class CountAccum {
        public long total;
    }

    //初始化count UDAF的accumulator。
    public CountAccum createAccumulator() {
        CountAccum acc = new CountAccum();
        acc.total = 0;
        return acc;
    }

    //getValue提供了如何通过存放状态的accumulator计算count UDAF的结果的方法。
    public Long getValue(CountAccum accumulator) {
        return accumulator.total;
    }

    //accumulate提供了如何根据输入的数据更新count UDAF存放状态的accumulator。
    public void accumulate(CountAccum accumulator, Object iValue) {
        accumulator.total++;
    }

    public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
         for (CountAccum other : its) {
            accumulator.total += other.total;
         }
    }
}

示例

-- UDAF计算count
CREATE FUNCTION countUdaf AS 'com.hjc.test.blink.sql.udx.CountUdaf';

create table sls_stream(
   a int,
   b bigint,
   c varchar
) with (
   type='sls',
   endPoint='yourEndpoint',
   accessKeyId='yourAccessId',
   accessKeySecret='yourAccessSecret',
   startTime='2017-07-04 00:00:00',
   project='<yourPorjectName>',
   logStore='stream-test2',
   consumerGroup='consumerGroupTest3'
);

create table rds_output(
   len1 bigint,
   len2 bigint
) with (
   type='rds',
   url='yourDatabaseURL',
   tableName='<yourDatabaseTableName>',
   userName='<yourDatabaseUserName>',
   password='<yourDatabasePassword>'
);

insert into rds_output
select
   count(a),
   countUdaf(a)
from sls_stream;

自定义表值函数(UDTF)

定义

与自定义的标量函数类似,自定义的表值函数(UDTF)将0个、1个或多个标量值作为输入参数(可以是变长参数)。与标量函数不同,表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。

编写业务逻辑代码

UDTF需要在TableFunction类中实现eval方法。open方法和close方法可选。示例代码如下:

package com.hjc.test.blink.sql.udx;

import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;

public class SplitUdtf extends TableFunction<String> {

    // 可选,open方法可不编写。如果编写,则需要添加声明'import org.apache.flink.table.functions.FunctionContext;'。
    @Override
    public void open(FunctionContext context) {
        // ... ...
        }

    public void eval(String str) {
        String[] split = str.split("\\|");
        for (String s : split) {
            collect(s);
        }
    }

    // 可选,close方法可不编写。
    @Override
    public void close() {
        // ... ...
        }
}

多行返回

UDTF可以通过多次调用collect()实现将1行的数据转为多行返回。

多列返回

UDTF不仅可以进行1行转多行,还可以1列转多列。如果您需要UDTF返回多列,只需要将返回值声明成Tuple或Row。Tuple或Row解释如下:

  • 返回值为Tuple

    实时计算Flink版支持使用Tuple1到Tuple25 ,定义1个字段到25个字段。用Tuple3来返回3个字段的UDTF示例如下。

    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.table.functions.TableFunction;
    
    // 使用Tuple作为返回值,一定要显式声明Tuple的泛型类型, 例如,String、Long和Integer。
    public class ParseUdtf extends TableFunction<Tuple3<String, Long, Integer>> {
    
    public void eval(String str) {
    String[] split = str.split(",");
    // 以下代码仅作示例,实际业务需要添加更多的校验逻辑。
    String first = split[0];
    long second = Long.parseLong(split[1]);
    int third = Integer.parseInt(split[2]);
    Tuple3<String, Long, Integer> tuple3 = Tuple3.of(first, second, third);
    collect(tuple3);
    }
    }
    

    说明 使用Tuple时,字段值不能为null,且最多只能存在25个字段。

  • 返回值为Row

    使用Row来实现返回3个字段的UDTF示例如下。

    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.DataTypes;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    public class ParseUdtf extends TableFunction<Row> {
    
    public void eval(String str) {
    String[] split = str.split(",");
    String first = split[0];
    long second = Long.parseLong(split[1]);
    int third = Integer.parseInt(split[2]);
    Row row = new Row(3);
    row.setField(0, first);
    row.setField(1, second);
    row.setField(2, third);
    collect(row);
    }
    
    @Override
    // 如果返回值是Row,则必须重载实现getResultType方法,显式地声明返回的字段类型。
    public DataType getResultType(Object[] arguments, Class[] argTypes) {
    return DataTypes.createRowType(DataTypes.STRING, DataTypes.LONG, DataTypes.INT);
    }
    }
    

    说明 Row的字段值可以是null,但如果需要使用Row,必须重载实现getResultType方法。

SQL语法

UDTF支持cross join和left join,在使用UDTF时需要添加lateraltable关键字。以ParseUdtf为例,需要先注册一个Function名字。

CREATE FUNCTION parseUdtf AS 'com.alibaba.blink.sql.udtf.ParseUdtf';
  • cross join

    左表的每一行数据都会关联上UDTF产出的每一行数据,如果UDTF不产出任何数据,则这1行不会输出。

    select S.id, S.content, T.a, T.b, T.c
    from input_stream as S,
    lateral table(parseUdtf(content)) as T(a, b, c);
    
  • left join

    左表的每一行数据都会关联上UDTF产出的每一行数据,如果UDTF不产出任何数据,则这1行的UDTF的字段会用null值填充。

    select S.id, S.content, T.a, T.b, T.c
    from input_stream as S
    left join lateral table(parseUdtf(content)) as T(a, b, c) on true;
    

    说明 left join UDTF语句后面必须添加on true参数。

UDTF示例

-- UDTF str.split("\\|");
create function splitUdtf as 'com.hjc.test.blink.sql.udx.SplitUdtf';

create table sls_stream(
a INT,
b BIGINT,
c VARCHAR
) with (
type='sls',
endPoint='yourEndpoint',
accessKeyId='yourAccessKeyId',
accessKeySecret='yourAccessSecret',
startTime = '2017-07-04 00:00:00',
project='yourProjectName',
logStore='yourLogStoreName',
consumerGroup='consumerGroupTest2'
);

-- 将c字段传入splitUdtf,切分后得到多行1列的表T(s)。s表示字段名字。
create view v1 as
select a,b,c,s
from sls_stream,
lateral table(splitUdtf(c)) as T(s);

create table rds_output(
id INT,
len BIGINT,
content VARCHAR
) with (
type='rds',
url='yourDatabaseURL',
tableName='yourDatabaseTableName',
userName='yourDatabaseUserName',
password='yourDatabasePassword'
);

insert into rds_output
select
a,b,s
from v1;
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-04-06 16:04:04  更:2022-04-06 16:06:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:31:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码