| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink1.15 SQL实现自定义UDF -> 正文阅读 |
|
[大数据]Flink1.15 SQL实现自定义UDF |
1 Flink SQL自定义函数分类说到UDF函数,通过HiveSQL的人会想到UDF、UDAF、UDTF,在Flink Table API/SQL中没有可以提这几个概念,函数划分的会细一些,但是它们跟UDF、UDAF、UDTF有对应的关系。 2 Flink SQL自定义函数基本用法2.1 注册和使用UDF 调用方式1:Table API以call函数内联方式调用(不需要注册) ?tEnv.from("users").select(call(SubstringFunction.class, $("introduction"), 5,13)) ?.execute() ?.print(); 调用方式式2:使用Table API,先注册再通过注册的名字调用 ?//注册函数 ?tEnv.createTemporarySystemFunction("SubstringFunction",SubstringFunction.class); ?// Table API使用:使用call函数调用已注册的UDF ?tEnv.from("users").select(call("SubstringFunction", $("introduction"), 5, 13)) ?.execute() ?.print(); 调用方式3:使用SQL,先注册再通过注册的名字调用 ?tEnv.sqlQuery("SELECT SubstringFunction(introduction, 5, 13) FROM users") ?.execute() ?.print(); ?} 2.2 UDF实现要点 1、基类 UDF需要继承对应的基类,例如ScalarFunction(该类必须声明为公共、非抽象、全局可访问的。因此,不允许使用非静态内部类或匿名类;必须有默认构造方法(因为Flink需要实例化并注册到catalog中) 2、eval方法 必须提供公共的、有明确定义的参数的eval方法(可以重载,可变参数,继承) public static class SumFunction extends ScalarFunction { ?public Integer eval(Integer a, Integer b) { ?return a + b; ?} ?public Integer eval(String a, String b) { ?return Integer.valueOf(a) + Integer.valueOf(b); ?} ?public Integer eval(Double... d) { ?double result = 0; ?for (double value : d) ?result += value; ?return (int) result; ?} } 3、使用运行时 UDF基类的open、close方法可以被覆盖,分别用于自定义UDF初始化和清理逻辑。 在open方法中,提供FunctionContext参数,通过它可以获取Runtime环境的各种信息: 3 Flink SQL 自定义UDF案例3.1 UDF自定义 ScalarFunction(标量函数)一进一出(将0个/1个/多个标量值映射到一个新的标量值),继承ScalarFunction即可。 3.2 测试数据集 用户点击页面的测试数据集如下所示: {"user":"Mary","url":"./home","cTime":"2022-02-02 12:00:00"} {"user":"Mary","url":"./prod?id=1","cTime":"2022-02-02 12:00:05"} {"user":"Liz","url":"./home","cTime":"2022-02-02 12:01:00"} {"user":"Bob","cTime":"2022-02-02 12:01:30"} {"user":"Mary","url":"./prod?id=7","cTime":"2022-02-02 12:01:45"} 3.3 UDF调用方式1 通过Flink Table API 以call函数内联方式调用(不需要注册)UDF,具体实现代码如下所示: public class FlinkSQLBaseScalarFunction { ? ? public static void main(String[] args) throws Exception { ? ? ? ? //1、获取Stream执行环境 ? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ? ? ? ? //2、创建表执行环境 ? ? ? ? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); ? ? ? ? env.setParallelism(1); ? ? ? ? //3、读入数据 ? ? ? ? DataStream<String> lines = env.socketTextStream("hadoop1", 8888); ? ? ? ? //4、流转换为动态表 ? ? ? ? Table table = tEnv.fromDataStream(lines, ? ? ? ? ? ? ? ? $("line") ? ? ? ? ); ? ? ? ? tEnv.createTemporaryView("clicklog",table); ? ? ? ? //调用方式1:以call函数内联方式调用(不需要注册) ? ? ? ? tEnv.from("clicklog") .select( ? ? ? ? ? ? ? ? call(JsonFunction.class, $("line"), "user"), ? ? ? ? ? ? ? ? call(JsonFunction.class, $("line"), "url"), ? ? ? ? ? ? ? ? call(JsonFunction.class, $("line"), "cTime")) ? ? ? ? ? ? .execute() ? ? ? ? ? ? .print(); ? ? } ? ? /** ? ? ?* 最简单的标量函数: ? ? ?*/ ? ? public static class JsonFunction extends ScalarFunction { ? ? ? ? public String eval(String line,String key) { ? ? ? ? ? ? JSONObject baseJson = new JSONObject(line); ? ? ? ? ? ? String value = ""; ? ? ? ? ? ? if(baseJson.has(key)){ ? ? ? ? ? ? ? ? return baseJson.getString(key); ? ? ? ? ? ? } ? ? ? ? ? ? return value; ? ? ? ? } ? ? } } 3.4 UDF调用方式2 以Flink Table API方式, 先注册UDF再通过注册的名字调用UDF,具体实现代码如下所示: public class FlinkSQLBaseScalarFunction { ? ? public static void main(String[] args) throws Exception { ? ? ? ? //1、获取Stream执行环境 ? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ? ? ? ? //2、创建表执行环境 ? ? ? ? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); ? ? ? ? env.setParallelism(1); ? ? ? ? //3、读入数据 ? ? ? ? DataStream<String> lines = env.socketTextStream("hadoop1", 8888); ? ? ? ? //4、流转换为动态表 ? ? ? ? Table table = tEnv.fromDataStream(lines, ? ? ? ? ? ? ? ? $("line") ? ? ? ? ); ? ? ? ? tEnv.createTemporaryView("clicklog",table); ? ? ? ? //调用方式2:先注册在通过注册的名字调用 //注册函数 tEnv.createTemporarySystemFunction("JsonFunction", JsonFunction.class); //2.1Table API:使用call函数调用已注册的UDF tEnv.from("clicklog") .select( call("JsonFunction",$("line"), "user"), call("JsonFunction",$("line"), "url"), call("JsonFunction",$("line"), "cTime")) .execute() .print(); ? ? } ? ? /** ? ? ?* 最简单的标量函数: ? ? ?*/ ? ? public static class JsonFunction extends ScalarFunction { ? ? ? ? public String eval(String line,String key) { ? ? ? ? ? ? JSONObject baseJson = new JSONObject(line); ? ? ? ? ? ? String value = ""; ? ? ? ? ? ? if(baseJson.has(key)){ ? ? ? ? ? ? ? ? return baseJson.getString(key); ? ? ? ? ? ? } ? ? ? ? ? ? return value; ? ? ? ? } ? ? } } 3.5 UDF调用方式3 以Flink SQL方式, 先注册UDF再通过注册的名字调用UDF,具体实现代码如下所示: public class FlinkSQLBaseScalarFunction { ? ? public static void main(String[] args) throws Exception { ? ? ? ? //1、获取Stream执行环境 ? ? ? ? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ? ? ? ? //2、创建表执行环境 ? ? ? ? StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); ? ? ? ? env.setParallelism(1); ? ? ? ? //3、读入数据 ? ? ? ? DataStream<String> lines = env.socketTextStream("hadoop1", 8888); ? ? ? ? //4、流转换为动态表 ? ? ? ? Table table = tEnv.fromDataStream(lines, ? ? ? ? ? ? ? ? $("line") ? ? ? ? ); ? ? ? ? tEnv.createTemporaryView("clicklog",table); ? ? ? ? //调用方式2:先注册在通过注册的名字调用 //注册函数 tEnv.createTemporarySystemFunction("JsonFunction", JsonFunction.class); //2.2SQL使用:使用call函数调用已注册的UDF Table result = tEnv.sqlQuery("SELECT JsonFunction(line, 'user'),JsonFunction(line, 'url'),JsonFunction(line, 'cTime') FROM clicklog"); result.execute().print(); ? ? } ? ? /** ? ? ?* 最简单的标量函数: ? ? ?*/ ? ? public static class JsonFunction extends ScalarFunction { ? ? ? ? public String eval(String line,String key) { ? ? ? ? ? ? JSONObject baseJson = new JSONObject(line); ? ? ? ? ? ? String value = ""; ? ? ? ? ? ? if(baseJson.has(key)){ ? ? ? ? ? ? ? ? return baseJson.getString(key); ? ? ? ? ? ? } ? ? ? ? ? ? return value; ? ? ? ? } ? ? } } |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 6:44:12- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |