IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 开发测试 -> FlinkSQL-自定义表聚合函数TableAggregateFunction -> 正文阅读

[开发测试]FlinkSQL-自定义表聚合函数TableAggregateFunction

什么是表聚合函数

表聚合,多对多,多行输入多行输出

  • 用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAF),可以把一个表中数据,聚合为具有多行和多列的结果表
  • 用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的

表聚合函数的实现

TableAggregateFunction 要求必须实现的方法

  • createAccumulator()
  • accumulate()
  • emitValue()

表聚合函数的工作原理

  • 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器
  • 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器
  • 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果

代码实现

//实现自定义的AggregateFunction<结果值,中间状态>
    public static class Top2 extends TableAggregateFunction<Tuple2<Double, Integer>, Tuple2<Double,Double>> {
        //输出结果Tuple2<温度, 排名>
        public void emitValue(Tuple2<Double, Double> acc, Collector<Tuple2<Double, Integer>> out) {
            if (acc.f0 != Double.MIN_VALUE) {
                out.collect(Tuple2.of(acc.f0, 1));
            }
            if (acc.f1 != Double.MIN_VALUE) {
                out.collect(Tuple2.of(acc.f1, 2));
            }
        }
        //初始化累加器
        @Override
        public Tuple2<Double, Double> createAccumulator() {
            return new Tuple2<Double,Double>(Double.MIN_VALUE,Double.MIN_VALUE);
        }
        //必须实现一个accumulate(中间状态,传入的一条数据)方法,来数据之后更新状态
        public void accumulate(Tuple2<Double,Double> acc, Double temp){
            if (temp > acc.f0) {
                acc.f1 = acc.f0;
                acc.f0 = temp;
            } else if (temp > acc.f1) {
                acc.f1 = temp;
            }
        }
    }

测试用例

		//流转换成表 Tuple3<String,Long,Double>
        Table sourceTable = tableEnv.fromDataStream(dataStream, "f0 as id, f1 as ts, f2 as temp,pt.proctime");
        //在环境中注册UDTAF
        Top2 top2 = new Top2();
        tableEnv.registerFunction("top2",top2);
        //TableAPI
        Table resultTable = sourceTable.groupBy("id")
                .flatAggregate("top2(temp) as (temp,rank)")
                .select("id,temp,rank");
        tableEnv.toRetractStream(resultTable, Row.class).print();
        //暂不支持SQL方式调用
  开发测试 最新文章
pytest系列——allure之生成测试报告(Wind
某大厂软件测试岗一面笔试题+二面问答题面试
iperf 学习笔记
关于Python中使用selenium八大定位方法
【软件测试】为什么提升不了?8年测试总结再
软件测试复习
PHP笔记-Smarty模板引擎的使用
C++Test使用入门
【Java】单元测试
Net core 3.x 获取客户端地址
上一篇文章      下一篇文章      查看所有文章
加:2021-07-30 13:02:26  更:2021-07-30 13:03:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/3 1:15:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码