IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 函数(function) -> 正文阅读

[大数据]Flink 函数(function)

在Flink中函数的使用,Flink为用户提供了许多内置函数
如果这些函数还满足不了你的需求,还可以自定义函数

内置函数

比较函数

一些简单的常用比较函数

比较功能描述
value1 = value2如果value1等于value2,则返回 TRUE ;如果value1或value2为 NULL,则返回 UNKNOWN 。
value1 <> value2如果value1不等于value2则返回 TRUE ;如果value1或value2为 NULL,则返回 UNKNOWN。
>,>=,<,<=如果value1大于(大于等于,小于,小于等于)value2,则返回 TRUE ;如果value1或value2为 NULL,则返回 UNKNOWN 。
value1 IS DISTINCT FROM value2如果两个值不相等,则返回 TRUE。NULL 值在此处被视为相同。例如,1 IS DISTINCT FROM NULL返回 TRUE; NULL IS DISTINCT FROM NULL返回 FALSE。
value is null(is not null)如果值为NULL(不为null),则返回 TRUE 。

更多参考官网:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html

用户自定义函数

标量函数

用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。
为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。

定义函数
// 定义一个可以把字符串转成大写标量函数
public static class ToUpperCase extends ScalarFunction {
    public String eval(String s){
        return s.toUpperCase();
    }
}
使用函数
TableAPI
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<String> stream = env.fromElements("hello", "flink", "Hello");

Table table = tEnv.fromDataStream(stream, $("word"));

// 1. table api 使用方式1: 不注册直接 inline 使用
table.select(call(ToUpperCase.class, $("word")).as("word_upper")).execute().print();
// 2. table api 使用方式2: 注册后使用
// 2.1 注册函数
tEnv.createTemporaryFunction("toUpper", ToUpperCase.class);
// 2.2 使用函数
table.select(call("toUpper", $("word")).as("word_upper")).execute().print();
SQL
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<String> stream = env.fromElements("hello", "flink", "Hello");

Table table = tEnv.fromDataStream(stream, $("word"));

// 1. 注册临时函数
tEnv.createTemporaryFunction("toUpper", ToUpperCase.class);
// 2. 注册临时表
tEnv.createTemporaryView("t_word", table);
// 3. 在临时表上使用自定义函数查询
tEnv.sqlQuery("select toUpper(word) word_upper from t_word").execute().print();
表值函数

跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。
要定义一个表值函数,你需要扩展 org.apache.flink.table.functions 下的 TableFunction,可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 collect(T) 方法来发送要输出的行。
在 Table API 中,表值函数是通过 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 来使用的。joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。
在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE() 的使用。

定义函数
@FunctionHint(output = @DataTypeHint("ROW(word string, len int)"))
public static class Split extends TableFunction<Row> {
    public void eval(String line) {
        if (line.length() == 0) {
            return;
        }
        for (String s : line.split(",")) {
            // 来一个字符串, 按照逗号分割, 得到多行, 每行为这个单词和他的长度
            collect(Row.of(s, s.length()));
        }
    }
}
Table API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<String> stream = env.fromElements("hello,flink,world", "aaa,bbbbb", "");

Table table = tEnv.fromDataStream(stream, $("line"));

// 1. 内联使用
table
    .joinLateral(call(Split.class, $("line")))
    .select($("line"), $("word"), $("len"))
    .execute()
    .print();

table
    .leftOuterJoinLateral(call(Split.class, $("line")))
    .select($("line"), $("word"), $("len"))
    .execute()
    .print();

// 2. 注册后使用
tEnv.createTemporaryFunction("split", Split.class);
table
    .joinLateral(call("split", $("line")))
    .select($("line"), $("word"), $("len"))
    .execute()
    .print();
SQL
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<String> stream = env.fromElements("hello,flink,world", "aaa,bbbbb", "");

Table table = tEnv.fromDataStream(stream, $("line"));

// 1. 注册表
tEnv.createTemporaryView("t_word", table);
// 2. 注册函数
tEnv.createTemporaryFunction("split", Split.class);
// 3. 使用函数
// 3.1 join
tEnv.sqlQuery("select " +
                  " line, word, len " +
                  "from t_word " +
                  "join lateral table(split(line)) on true").execute().print();
// 或者
tEnv.sqlQuery("select " +
                  " line, word, len " +
                  "from t_word, " +
                  "lateral table(split(line))").execute().print();
// 3.2 left join
tEnv.sqlQuery("select " +
                  " line, word, len " +
                  "from t_word " +
                  "left join lateral table(split(line)) on true").execute().print();
// 3.3 join或者left join给字段重命名
tEnv.sqlQuery("select " +
                  " line, new_word, new_len " +
                  "from t_word " +
                  "left join lateral table(split(line)) as T(new_word, new_len) on true").execute().print();
聚合函数

用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。

定义函数
// 累加器类型
public static class VcAvgAcc {
    public Integer sum = 0;
    public Long count = 0L;
}

public static class VcAvg extends AggregateFunction<Double, VcAvgAcc> {
    
    // 返回最终的计算结果
    @Override
    public Double getValue(VcAvgAcc accumulator) {
        return accumulator.sum * 1.0 / accumulator.count;
    }
    
    // 初始化累加器
    @Override
    public VcAvgAcc createAccumulator() {
        return new VcAvgAcc();
    }
    
    // 处理输入的值, 更新累加器
    // 参数1: 累加器
    // 参数2,3,...: 用户自定义的输入值
    public void accumulate(VcAvgAcc acc, Integer vc) {
        acc.sum += vc;
        acc.count += 1L;
    }
    
}
Table API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<WaterSensor> waterSensorStream =
    env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                     new WaterSensor("sensor_1", 2000L, 20),
                     new WaterSensor("sensor_2", 3000L, 30),
                     new WaterSensor("sensor_1", 4000L, 40),
                     new WaterSensor("sensor_1", 5000L, 50),
                     new WaterSensor("sensor_2", 6000L, 60));

Table table = tEnv.fromDataStream(waterSensorStream);

// 1. 内联使用
table
    .groupBy($("id"))
    .select($("id"), call(VcAvg.class, $("vc")))
    .execute()
    .print();

// 2. 注册后使用
tEnv.createTemporaryFunction("my_avg", VcAvg.class);
table
    .groupBy($("id"))
    .select($("id"), call("my_avg", $("vc")))
    .execute()
    .print();
SQL
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<WaterSensor> waterSensorStream =
    env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                     new WaterSensor("sensor_1", 2000L, 20),
                     new WaterSensor("sensor_2", 3000L, 30),
                     new WaterSensor("sensor_1", 4000L, 40),
                     new WaterSensor("sensor_1", 5000L, 50),
                     new WaterSensor("sensor_2", 6000L, 60));

Table table = tEnv.fromDataStream(waterSensorStream);

// 在sql中使用
// 1. 注册表
tEnv.createTemporaryView("t_sensor", table);
// 2. 注册函数
tEnv.createTemporaryFunction("my_avg", VcAvg.class);
// 3. sql中使用自定义聚合函数
tEnv.sqlQuery("select id, my_avg(vc) from t_sensor group by id").execute().print();
表值聚合函数

自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表,结果中可以有多行多列。

定义函数
// 累加器
public static class Top2Acc {
    public Integer first = Integer.MIN_VALUE; // top 1
    public Integer second = Integer.MIN_VALUE; // top 2
}

// Tuple2<Integer, Integer> 值和排序
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Acc> {
    
    @Override
    public Top2Acc createAccumulator() {
        return new Top2Acc();
    }
    
    public void accumulate(Top2Acc acc, Integer vc) {
        if (vc > acc.first) {
            acc.second = acc.first;
            acc.first = vc;
        } else if (vc > acc.second) {
            acc.second = vc;
        }
    }
    
    public void emitValue(Top2Acc acc, Collector<Tuple2<Integer, Integer>> out) {
        if (acc.first != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.first, 1));
        }
        
        if (acc.second != Integer.MIN_VALUE) {
            out.collect(Tuple2.of(acc.second, 2));
        }
        
    }
}
Table API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<WaterSensor> waterSensorStream =
    env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                     new WaterSensor("sensor_1", 2000L, 20),
                     new WaterSensor("sensor_2", 3000L, 30),
                     new WaterSensor("sensor_1", 4000L, 40),
                     new WaterSensor("sensor_1", 5000L, 50),
                     new WaterSensor("sensor_2", 6000L, 60));

Table table = tEnv.fromDataStream(waterSensorStream);
// 1. 内联使用
table
    .groupBy($("id"))
    .flatAggregate(call(Top2.class, $("vc")).as("v", "rank"))
    .select($("id"), $("v"), $("rank"))
    .execute()
    .print();
// 2. 注册后使用
tEnv.createTemporaryFunction("top2", Top2.class);
table
    .groupBy($("id"))
    .flatAggregate(call("top2", $("vc")).as("v", "rank"))
    .select($("id"), $("v"), $("rank"))
    .execute()
    .print();
SQL

SQL 还不支持表值聚合函数

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-27 16:17:12  更:2021-07-27 16:19:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/9 0:02:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码