IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink1.15 SQL实现自定义UDF -> 正文阅读

[大数据]Flink1.15 SQL实现自定义UDF

摘要:

类似于Hive UDF,Flink SQL提供了丰富的函数类型来自定义函数,从而为Flink SQL统计分析复杂的数据格式提供了重要手段。

1 Flink SQL自定义函数分类

说到UDF函数,通过HiveSQL的人会想到UDF、UDAF、UDTF,在Flink Table API/SQL中没有可以提这几个概念,函数划分的会细一些,但是它们跟UDF、UDAF、UDTF有对应的关系。

2 Flink SQL自定义函数基本用法

2.1 注册和使用UDF

调用方式1:Table API以call函数内联方式调用(不需要注册)

?tEnv.from("users").select(call(SubstringFunction.class, $("introduction"), 5,13))

?.execute()

?.print();

调用方式式2:使用Table API,先注册再通过注册的名字调用

?//注册函数

?tEnv.createTemporarySystemFunction("SubstringFunction",SubstringFunction.class);

?// Table API使用:使用call函数调用已注册的UDF

?tEnv.from("users").select(call("SubstringFunction", $("introduction"), 5, 13))

?.execute()

?.print();

调用方式3:使用SQL,先注册再通过注册的名字调用

?tEnv.sqlQuery("SELECT SubstringFunction(introduction, 5, 13) FROM users")

?.execute()

?.print();

?}

2.2 UDF实现要点

1、基类

UDF需要继承对应的基类,例如ScalarFunction(该类必须声明为公共、非抽象、全局可访问的。因此,不允许使用非静态内部类或匿名类;必须有默认构造方法(因为Flink需要实例化并注册到catalog中)

2、eval方法

必须提供公共的、有明确定义的参数的eval方法(可以重载,可变参数,继承)

public static class SumFunction extends ScalarFunction {

?public Integer eval(Integer a, Integer b) {

?return a + b;

?}

?public Integer eval(String a, String b) {

?return Integer.valueOf(a) + Integer.valueOf(b);

?}

?public Integer eval(Double... d) {

?double result = 0;

?for (double value : d)

?result += value;

?return (int) result;

?}

}

3、使用运行时

UDF基类的open、close方法可以被覆盖,分别用于自定义UDF初始化和清理逻辑。

在open方法中,提供FunctionContext参数,通过它可以获取Runtime环境的各种信息:

3 Flink SQL 自定义UDF案例

3.1 UDF自定义

ScalarFunction(标量函数)一进一出(将0个/1个/多个标量值映射到一个新的标量值),继承ScalarFunction即可。

3.2 测试数据集

用户点击页面的测试数据集如下所示:

{"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"}

{"user":"Mary","url":"./prod?id=1","cTime":"2022-02-02 12:00:05"}

{"user":"Liz","url":"./home","cTime":"2022-02-02 12:01:00"}

{"user":"Bob","cTime":"2022-02-02 12:01:30"}

{"user":"Mary","url":"./prod?id=7","cTime":"2022-02-02 12:01:45"}

3.3 UDF调用方式1

通过Flink Table API 以call函数内联方式调用(不需要注册)UDF,具体实现代码如下所示:

public class FlinkSQLBaseScalarFunction {

? ? public static void main(String[] args) throws Exception {

? ? ? ? //1、获取Stream执行环境

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? //2、创建表执行环境

? ? ? ? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

? ? ? ? env.setParallelism(1);

? ? ? ? //3、读入数据

? ? ? ? DataStream<String> lines = env.socketTextStream("hadoop1", 8888);

? ? ? ? //4、流转换为动态表

? ? ? ? Table table = tEnv.fromDataStream(lines,

? ? ? ? ? ? ? ? $("line")

? ? ? ? );

? ? ? ? tEnv.createTemporaryView("clicklog",table);

? ? ? ? //调用方式1:以call函数内联方式调用(不需要注册)

? ? ? ? tEnv.from("clicklog")

.select(

? ? ? ? ? ? ? ? call(JsonFunction.class, $("line"), "user"),

? ? ? ? ? ? ? ? call(JsonFunction.class, $("line"), "url"),

? ? ? ? ? ? ? ? call(JsonFunction.class, $("line"), "cTime"))

? ? ? ? ? ? .execute()

? ? ? ? ? ? .print();

? ? }

? ? /**

? ? ?* 最简单的标量函数:

? ? ?*/

? ? public static class JsonFunction extends ScalarFunction {

? ? ? ? public String eval(String line,String key) {

? ? ? ? ? ? JSONObject baseJson = new JSONObject(line);

? ? ? ? ? ? String value = "";

? ? ? ? ? ? if(baseJson.has(key)){

? ? ? ? ? ? ? ? return baseJson.getString(key);

? ? ? ? ? ? }

? ? ? ? ? ? return value;

? ? ? ? }

? ? }

}

3.4 UDF调用方式2

以Flink Table API方式, 先注册UDF再通过注册的名字调用UDF,具体实现代码如下所示:

public class FlinkSQLBaseScalarFunction {

? ? public static void main(String[] args) throws Exception {

? ? ? ? //1、获取Stream执行环境

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? //2、创建表执行环境

? ? ? ? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

? ? ? ? env.setParallelism(1);

? ? ? ? //3、读入数据

? ? ? ? DataStream<String> lines = env.socketTextStream("hadoop1", 8888);

? ? ? ? //4、流转换为动态表

? ? ? ? Table table = tEnv.fromDataStream(lines,

? ? ? ? ? ? ? ? $("line")

? ? ? ? );

? ? ? ? tEnv.createTemporaryView("clicklog",table);

? ? ? ? //调用方式2:先注册在通过注册的名字调用

//注册函数

tEnv.createTemporarySystemFunction("JsonFunction", JsonFunction.class);

//2.1Table API:使用call函数调用已注册的UDF

tEnv.from("clicklog")

.select(

call("JsonFunction",$("line"), "user"),

call("JsonFunction",$("line"), "url"),

call("JsonFunction",$("line"), "cTime"))

.execute()

.print();

? ? }

? ? /**

? ? ?* 最简单的标量函数:

? ? ?*/

? ? public static class JsonFunction extends ScalarFunction {

? ? ? ? public String eval(String line,String key) {

? ? ? ? ? ? JSONObject baseJson = new JSONObject(line);

? ? ? ? ? ? String value = "";

? ? ? ? ? ? if(baseJson.has(key)){

? ? ? ? ? ? ? ? return baseJson.getString(key);

? ? ? ? ? ? }

? ? ? ? ? ? return value;

? ? ? ? }

? ? }

}

3.5 UDF调用方式3

以Flink SQL方式, 先注册UDF再通过注册的名字调用UDF,具体实现代码如下所示:

public class FlinkSQLBaseScalarFunction {

? ? public static void main(String[] args) throws Exception {

? ? ? ? //1、获取Stream执行环境

? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

? ? ? ? //2、创建表执行环境

? ? ? ? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

? ? ? ? env.setParallelism(1);

? ? ? ? //3、读入数据

? ? ? ? DataStream<String> lines = env.socketTextStream("hadoop1", 8888);

? ? ? ? //4、流转换为动态表

? ? ? ? Table table = tEnv.fromDataStream(lines,

? ? ? ? ? ? ? ? $("line")

? ? ? ? );

? ? ? ? tEnv.createTemporaryView("clicklog",table);

? ? ? ? //调用方式2:先注册在通过注册的名字调用

//注册函数

tEnv.createTemporarySystemFunction("JsonFunction", JsonFunction.class);

//2.2SQL使用:使用call函数调用已注册的UDF

Table result = tEnv.sqlQuery("SELECT JsonFunction(line, 'user'),JsonFunction(line, 'url'),JsonFunction(line, 'cTime') FROM clicklog");

result.execute().print();

? ? }

? ? /**

? ? ?* 最简单的标量函数:

? ? ?*/

? ? public static class JsonFunction extends ScalarFunction {

? ? ? ? public String eval(String line,String key) {

? ? ? ? ? ? JSONObject baseJson = new JSONObject(line);

? ? ? ? ? ? String value = "";

? ? ? ? ? ? if(baseJson.has(key)){

? ? ? ? ? ? ? ? return baseJson.getString(key);

? ? ? ? ? ? }

? ? ? ? ? ? return value;

? ? ? ? }

? ? }

}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-10 11:58:25  更:2022-05-10 11:59:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:44:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码