IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 函数 -> 正文阅读

[大数据]Flink 函数

Flink 函数

相关博客:

Flink-函数 | 用户自定义函数(UDF)标量函数 | 表函数 | 聚合函数 | 表聚合函数

一、Flink Table API 和 SQL 内置函数

Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。

SQL中支持的很多函数,Table API 和 SQL都已经做了实现

  • 比较函数

    • SQL:

      value1 = value2

      value1 > value2

    • Table API

      ANY1 === ANY2

      ANY1 > ANY2

  • 逻辑函数

    • SQL:

      boolean1= boolean2

      boolean IS FALSE

      NOT boolean

      boolean IS FALSE

      NOT boolean

    • Table API

      BOOLEAN1 || BOOLEAN2

      BOOLEAN.isFalse

      !BOOLEAN

  • 算数函数

    • SQL:

      numeric1 + numeric2

      POWER(numeric1, numeric2)

    • Table API

      NUMERIC1 + NUMERIC2

      NUMERIC1.POWER(NUMERIC2)

  • 字符串函数

    • SQL:

      string1 + string2

      UPPER(string)

      CHAR_LENGTH(string)

    • Table API

      STRING1 + STRING2

      STRING.upperCase()

      STRING.charLength()

  • 时间函数

    • SQL:

      DATE string

      TIMESTAMP string

      CURRENT_TIME

      interval string range

    • Table API

      STRING.toDate

      STRING.toTimestamp

      currentTime()

      NUMERIC.days

  • 聚合函数

    • SQL:

      COUNT()

      SUM(expression)

      RANK()

      ROW_NUMBER()

    • Table API

      FIELD.count

      FIELD.sum

二、用户自定义函数(UDF)

用户定义函数(User-defined Functions,UDF)是一个重要的特性,它们显著地扩展了查询的表达能力.

一些系统内置函数无法解决的需求,可以用UDF来自定义实现

在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用

函数通过调用 registerFunction() 方法在 TableEnvironment 中注册。当用户定义的函数被注册时,它被插入到 TableEnvironment 的函数目录中,这样Table API 或 SQL 解析器就可以识别并正确地解释它。

sql函数有两大类型:

  • scalar Function类似于map,一对一
  • Table Function类似与flatMap,一对多

2.1 标量函数(Scalar Functions)

定义标量函数,可以将0、1或多个标量值,映射到新的标量值

为了定义标量函数,必须扩展基类ScalarFunction,并实现求值(eval)方法。

标量函数的行为由求值方法决定,求值方法必须public公开声明并命名为eval

例如:

package com.root.udf;

import com.root.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/9 15:44
 */

public class UDFTest1_Scalar {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");

        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] field = line.split(",");
            return new SensorReading(field[0], new Long(field[1]), new Double(field[2]));
        });

        Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");

        // 自定义标量函数,实现求id的hash值
        HashCode hashCode = new HashCode(23);
        // 注册UDF函数
        tableEnv.registerFunction("hashcode",hashCode);

        // Table API
        Table resultTable = table.select("id, ts, hashcode(id)");

        // SQL
        tableEnv.createTemporaryView("sensor",table);
        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, hashcode(id) from sensor");

        tableEnv.toAppendStream(resultTable, Row.class).print("result");
        tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");

        env.execute();

    }

    public static class HashCode **extends ScalarFunction**{
        private int factor = 13;
        public HashCode(int factor){
            this.factor = factor;
        }

        public int eval(String id){
            return id.hashCode() * 13;
        }

    }
}

2.2 表函数(Table Function)

用户定义的表函数,也可以将0、1或多个标量值作为输入参数,与标量函数不同,它可以返回任意数量的行作为输出,而不是单个值。

为了定义一个表函数,必须扩展TableFunction并实现求值方法。

同样的,表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。

例如:

package com.root.udf;

import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/9 16:01
 */

public class UDFTest2_Table {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 自定义表函数,类似于flatMap
        Spilt split = new Spilt("_");
        // 注册表函数
        tableEnv.registerFunction("split",split);

        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        Table table = tableEnv.fromDataStream(dataStream, "id,timestamp as ts,temperature");
        tableEnv.createTemporaryView("sensor",table);

        Table resultTable = table.joinLateral("split(id) as (word, length)").select("id, ts, word, length");

        Table resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length " +
                "from sensor, lateral table(split(id)) as splitid(word, length)");

        tableEnv.toAppendStream(resultTable, Row.class).print("res");
        tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql");

        env.execute();
    }
    
    // 将id分割,并分别返回对应的长度
    public static class Spilt extends TableFunction<Tuple2<String,Integer>>{
        public String sep = ",";
        public Spilt(String sep){
            this.sep = sep;
        }

        public void eval(String str){
            for (String s : str.split(sep)) {
                collect(new Tuple2<>(s,s.length()));
            }
        }
    }
}

2.3 聚合函数(Aggregate Function)

聚合,多对一,类似前面的窗口聚合

用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。

用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实现的。

在这里插入图片描述

  • AggregationFunction要求必须实现的方法
    • createAccumulator
    • accumulate
    • getValue
  • AggregationFunction的工作原理如下
    1. 首先,它需要一个累加器(Accumulator),用来保存聚合中间结果的数据结构;可以通过调用createAccumulator()方法初始化累加器。
    2. 随后,对每一个输入行调用函数的accumulate()方法来更新累加器。
    3. 处理完所有行后,将调用getValue()方法来计算并返回最终结果。

例如:

package com.root.udf;

import com.root.SensorReading;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;

/**
 * @author Kewei
 * @Date 2022/3/9 16:23
 */

public class UDFTest3_Aggra {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 实例一个求平均的函数
        AvgTemp avgTemp = new AvgTemp();
        // 注册一个UDAF
        tableEnv.registerFunction("avgTemp",avgTemp);

        Table table = tableEnv.fromDataStream(dataStream, "id, timestamp as ts, temperature as temp");
        tableEnv.createTemporaryView("sensor",table);

        // 以下两种Table API写法结果相同
        Table result = table.groupBy("id").select("id, avgTemp(temp)");
        Table result2 = table
                .groupBy("id")
                .aggregate("avgTemp(temp) as avgtemp")
                .select("id, avgtemp");

        // sql
        Table resultSql = tableEnv.sqlQuery("select id,avgTemp(temp) from sensor group by id");

        tableEnv.toRetractStream(result2, Row.class).print("res");
        tableEnv.toRetractStream(resultSql,Row.class).print("sql");

        env.execute();

    }
    
    // 计算平均温度
    public static class AvgTemp extends AggregateFunction<Double, Tuple2<Double,Integer>>{

        @Override
        public Double getValue(Tuple2<Double, Integer> value) {
            return value.f0/value.f1;
        }

        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }

        public void accumulate(Tuple2<Double,Integer> acc, Double value){
            acc.f0 += value;
            acc.f1 += 1;
        }
    }
}

2.4 表聚合函数(Table Function)

用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一个表中数据,聚合为具有多行和多列的结果表。

用户定义表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。

在这里插入图片描述

  • AggregationFunction 要求必须实现的方法:

    • createAccumulator()
    • accumulate()
    • emitValue()
  • TableAggregateFunction 的工作原理如下:

    1. 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用 createAccumulator() 方法可以创建空累加器。
    2. 随后,对每个输入行调用函数的 accumulate() 方法来更新累加器。
    3. 处理完所有行后,将调用函数的 emitValue() 方法来计算并返回最终结果。

    例如:

    package com.root.udf;
    
    import com.root.SensorReading;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.functions.TableAggregateFunction;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    
    /**
     * @author Kewei
     * @Date 2022/3/9 16:40
     */
    
    public class UDFTest4_TableAgg {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            DataStreamSource<String> inputStream = env.readTextFile("data/sensor.txt");
            SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
                    })
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(1)) {
                        @Override
                        public long extractTimestamp(SensorReading value) {
                            return value.getTimestamp() * 1000L;
                        }
                    });
    
            // 实例一个自定义聚合表函数
            MyAggTable myAggTable = new MyAggTable();
            
            // 注册自定义聚合表函数
            tableEnv.registerFunction("myAgg",myAggTable);
    
            Table table = tableEnv.fromDataStream(dataStream, "id, temperature as temp, timestamp.rowtime as ts");
            tableEnv.createTemporaryView("sensor",table);
    
            // 使用
            Table result = table.groupBy("id")
                    .flatAggregate("myAgg(temp) as (temp, rank)")
                    .select("id, temp, rank");
            // 表聚合函数 不支持 sql调用
    
            tableEnv.toRetractStream(result, Row.class).print("res");
    
            env.execute();
    
        }
    
        // 创建一个类,保存排名第一和第二的温度
        public static class AggTabTempAcc{
            public Double highestTemp;
            public Double secondHighestTemp;
            public AggTabTempAcc(){
                highestTemp = Double.MIN_VALUE;
                secondHighestTemp = Double.MIN_VALUE;
            }
        }
    
        // 创建一个TableAggregateFunction函数,用于统计出,同一id排名第一和第二的温度
        public static class MyAggTable extends TableAggregateFunction<Tuple2<Double,Integer>,AggTabTempAcc>{
    
            // 初始化累加器
            @Override
            public AggTabTempAcc createAccumulator() {
                return new AggTabTempAcc();
            }
    
            // 更新累加器
            public void accumulate(AggTabTempAcc acc,Double temp){
                if (temp > acc.highestTemp){
                    acc.secondHighestTemp = acc.highestTemp;
                    acc.highestTemp = temp;
                } else if (temp > acc.secondHighestTemp){
                    acc.secondHighestTemp = temp;
                }
            }
    
            // 输出累加器
            public void emitValue(AggTabTempAcc acc, Collector<Tuple2<Double, Integer>> col){
                col.collect(new Tuple2<>(acc.highestTemp,1));
                col.collect(new Tuple2<>(acc.secondHighestTemp,2));
            }
        }
    
    }
    
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-07 22:46:53  更:2022-04-07 22:47:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:53:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码