IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink table api 高级用法 -> 正文阅读

[大数据]flink table api 高级用法

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wm.analyse</groupId>
    <artifactId>wm-offline-analysis-scala-flink</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <scala.version>2.11.8</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>


    </dependencies>


</project>

使用table api 实现车辆行程阶段数据划分聚合


package test;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.*;

public class Test1 {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        String filePath = "src\\main\\resources\\testData.csv";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //开启Checkpointing,每5分钟一次
        env.enableCheckpointing(5 * 60, CheckpointingMode.EXACTLY_ONCE);
        //创建table执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //读取本地文件
        DataStream<String> inputStream = env.readTextFile(filePath);
        //生成DataStream
        DataStream<VinBean> dataStraem = inputStream.map(line -> {
            String[] filds = line.split(",");
            return new VinBean(filds[0], Long.parseLong(filds[1]) * 1000, filds[2], Integer.parseInt(filds[3]), Double.parseDouble(filds[4]), Double.parseDouble(filds[5]), Double.parseDouble(filds[6]));
        });
        //设置水印
        DataStream<VinBean> dataStraem2 = dataStraem.assignTimestampsAndWatermarks(WatermarkStrategy
                .<VinBean>forBoundedOutOfOrderness(Duration.ofSeconds(10l))  //设置窗口延迟时间为10s
                .withTimestampAssigner((vinBean, timestamp) -> (vinBean.getCarTime()))  //以vinBean中的carTime为eventtime
                .withIdleness(Duration.ofMinutes(10l))); //如果窗口超过10分钟未触发就自动触发

        Table inputTable = tableEnv.fromDataStream(dataStraem2, $("vin"), $("carTime"), $("battery_code"), $("BMS_DCS_ActOprtMode"),
                $("BMS_MaxAllowChrgCur_DC"), $("BMS_CellVolMax"), $("BMS_CellVolMin"), $("carTime").rowtime().as("ts"));//将carTime作为eventtime

        inputTable.printSchema();

        tableEnv.createTemporarySystemFunction("ifFun", IfFun.class);

        Table aggVin = inputTable
                .window(Over.partitionBy($("vin")).orderBy($("ts")).preceding(rowInterval(1l)).as("w"))//用当前行和上一行划分窗口
                .select($("vin"), $("ts"), $("carTime"), $("carTime").min().over($("w")).as("min_carTime"))//求两行之间的最小时间,从而拿到上一行的时间
                .select($("vin"), $("ts"), $("carTime"), $("min_carTime"), $("carTime").minus($("min_carTime")).isGreater(10 * 1000).then(1, 0).as("flag"))//判断上一行的时间和当前行的时间是否大于10秒,如果大于就标记上1
                .window(Over.partitionBy($("vin")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w2"))//开一个无界窗口
                .select($("vin"), $("ts"), $("carTime"), $("flag").sum().over($("w2")).as("sum_flag"))//从无界窗口中sum flag,这样就能将超过10秒的阶段划分出来
                .window(Over.partitionBy($("vin"), $("sum_flag")).orderBy($("ts")).preceding(UNBOUNDED_RANGE).as("w3"))//再开一个无界窗口
                .select($("vin"), $("ts"), $("carTime").min().over($("w3")).as("start_time"), $("carTime").max().over($("w3")).as("end_time"), $("sum_flag"))//在无界窗口中,求每个阶段的最大时间和最小时间,在每个阶段中,最小时间不会变化,最大时间会随着数据变多而递增
                .window(Tumble.over(lit(10l).seconds()).on($("ts")).as("w4"))//开一个滚动窗口,避免频繁写入数据库
                .groupBy($("vin"),$("w4"),$("sum_flag"))//在滚动窗口中按阶段聚合,每次取最小的开始时间,和最大的结束时间,就能取到每个阶段的起点和当前终点
                .select($("vin"), $("start_time").min().as("start_time"), $("end_time").max().as("end_time"));

        tableEnv.toRetractStream(aggVin, Row.class).print();

        env.execute();

    }

}

用table api实现join获取每条数据5秒之前的数据

提前对两个流的时间进行处理,然后开启滚动窗口,使他们在窗口内join,这样既能避免频繁写数据库和无限join,又能保证在同一时间窗口一定能join上数据

package test;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.*;

public class Test1 {
    public static void main(String[] args) throws Exception {
        //创建执行环境
        String filePath = "src\\main\\resources\\testData.csv";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //开启Checkpointing,每5分钟一次
        env.enableCheckpointing(5 * 60, CheckpointingMode.EXACTLY_ONCE);
        //创建table执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //读取本地文件
        DataStream<String> inputStream = env.readTextFile(filePath);
        //生成DataStream
        DataStream<VinBean> dataStraem = inputStream.map(line -> {
            String[] filds = line.split(",");
            return new VinBean(filds[0], Long.parseLong(filds[1]) * 1000, filds[2], Integer.parseInt(filds[3]), Double.parseDouble(filds[4]), Double.parseDouble(filds[5]), Double.parseDouble(filds[6]));
        });
        //设置水印
        DataStream<VinBean> dataStraem2 = dataStraem.assignTimestampsAndWatermarks(WatermarkStrategy
                .<VinBean>forBoundedOutOfOrderness(Duration.ofSeconds(10l))  //设置窗口延迟时间为10s
                .withTimestampAssigner((vinBean, timestamp) -> (vinBean.getCarTime()))  //以vinBean中的carTime为eventtime
                .withIdleness(Duration.ofMinutes(10l))); //如果窗口超过10分钟未触发就自动触发

        //将当前流的时间加5秒,再自己join自己,这样就能取到每条数据5秒前的数据
        DataStream<VinBean> dataStraem3 = inputStream.map(line -> {
            String[] filds = line.split(",");
            return new VinBean(filds[0], Long.parseLong(filds[1]) * 1000 + 5000, filds[2], Integer.parseInt(filds[3]), Double.parseDouble(filds[4]), Double.parseDouble(filds[5]), Double.parseDouble(filds[6]));
        });
        //设置水印
        DataStream<VinBean> dataStraem4 = dataStraem3.assignTimestampsAndWatermarks(WatermarkStrategy
                .<VinBean>forBoundedOutOfOrderness(Duration.ofSeconds(10l))  //设置窗口延迟时间为10s
                .withTimestampAssigner((vinBean, timestamp) -> (vinBean.getCarTime()))  //以vinBean中的carTime为eventtime
                .withIdleness(Duration.ofMinutes(10l))); //如果窗口超过10分钟未触发就自动触发

        tableEnv.createTemporarySystemFunction("concatFun", ConcatFun.class);

        Table inputTable = tableEnv.fromDataStream(dataStraem2, $("vin"), $("carTime"), $("battery_code"), $("BMS_DCS_ActOprtMode"),
                $("BMS_MaxAllowChrgCur_DC"), $("BMS_CellVolMax"), $("BMS_CellVolMin"), $("carTime").rowtime().as("ts"))
                .window(Tumble.over(lit(10l).seconds()).on($("ts")).as("w"))//开一个滚动窗口,避免频繁写入数据库
                .groupBy($("vin"),$("ts"), $("w"))//在滚动窗口中按阶段聚合,每次取最小的开始时间,和最大的结束时间,就能取到每个阶段的起点和当前终点
                .select($("vin"), $("ts"),$("w").start().as("start"),$("w").end().as("end"))
                .addOrReplaceColumns($("ts").cast(DataTypes.STRING()).as("ts"))//join时不能有时间属性的字段,所以将他转成string类型
                .addColumns(call(ConcatFun.class, $("vin"), $("ts")).as("vin_ts"));//fulljoin和leftjoin只能用一个字段相等才能保留join不上的数据,所以将他们拼成一个字段

        Table inputTable2 = tableEnv.fromDataStream(dataStraem4, $("vin").as("vin2"), $("carTime").as("carTime2"),
                $("BMS_CellVolMax").as("BMS_CellVolMax2"), $("carTime").rowtime().as("ts2"))
                .window(Tumble.over(lit(10l).seconds()).on($("ts2")).as("w"))//开一个滚动窗口,避免频繁写入数据库
                .groupBy($("vin2"),$("ts2"), $("w"))//在滚动窗口中按阶段聚合,每次取最小的开始时间,和最大的结束时间,就能取到每个阶段的起点和当前终点
                .select($("vin2"),$("ts2"))
                .addOrReplaceColumns($("ts2").cast(DataTypes.STRING()).as("ts2"))//join时不能有时间属性的字段,所以将他转成string类型
                .addColumns(call(ConcatFun.class, $("vin2"), $("ts2")).as("vin_ts2"));//fulljoin和leftjoin只能用一个字段相等才能保留join不上的数据,所以将他们拼成一个字段

        inputTable.printSchema();
        inputTable2.printSchema();
        Table joined = inputTable.leftOuterJoin(inputTable2, $("vin_ts").isEqual($("vin_ts2")));
        joined.printSchema();


        tableEnv.toRetractStream(joined, Row.class).print();

        env.execute();

    }

}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-22 13:36:15  更:2021-08-22 13:38:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:48:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码