IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink入门第八课:DataStream api的watermark的相关操作 -> 正文阅读

[大数据]Flink入门第八课:DataStream api的watermark的相关操作

package com.atguigu.Ctime;

import com.atguigu.Fbeans.SensorReading;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;


/**
 * watermark是一条带有时间戳的单调递增的数据记录,它出现在数据流中意味着watermark之前的数据已全部到齐,
 *      此时如果没有设置allowedLateness,直接结束窗口触发最终计算;如果设置了,则先触发一次计算,
 *      然后按allowedLateness规定时间进行等待,等待过程中数据来一条计算一次,等待结束立马关闭窗口。
 *      窗口关闭后watermark以广播的方式传递会下游算子。由于任务是并行的,下游算子接收到watermark可能有多个,
 *      在当前subtask以最少小的为准逐个处理watermark。
 * 设置watermark一般在分组之前,allowedLateness是在分组之后。
 *
 * watermaker=当前最大事件时间-允许延迟时间。
 * watermaker>=窗口结束时间的时候,该窗口会触发计算。
 *
 * datastaream.assignTimestampsAndWatermaker(new T())
 *  T可以是AscendingTimestampExtractor(数据升序时用,不需要指定延迟时间)
 *  也可以是BoundedOutOfOrdernessTimestampExtractor(数据有界乱序时用)。
 *  在T的方法内获取实体类的时间字段作为事件时间,方法内时间单位是毫秒。
 *
 *  watermark可以来一条数据生成一次,也可以按时间间隔来生成。
 *  默认200ms生成一次可以通过env.getConfig().setAutoWatermarkInterval()来设置。
 *
 * 只能解决一定程度上的数据乱序和数据延时问题,完全解决需要使用OutputTag+sideOutputLateData侧道输出机制,
 * 相当于将allowedLateness配置的最大延迟之后的数据收集放入OutputTag中再输出。当然也可以对其做一些计算再输出。
 *
 * 设置事件时间:env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 数据流中必须有事件时间字段
 *
 */
public class AWaterMarkTest {
    public static void main(String[] args) throws Exception {
        //创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //读取数据并包装成pojo
        DataStreamSource<String> inputStream = env. socketTextStream("localhost",7777);
        DataStream<SensorReading> mapStream = inputStream.map(line -> {
            String[] splits = line.split(",");
            //这儿toString是为了数据传输时方便使用simpleStringSchema
            return new SensorReading(new String(splits[0]), new Long(splits[1]), new Double(splits[2]));
        });

        /**
         * 分配watermark
         *  watermark支持两个类:BoundedOutOfOrdernessTimestampExtractor和AscendingTimestampExtractor
         *  两个类都是用来设置watermark的,watermark的时间单位是毫秒。但是:
         *      前者处理有界乱序数据,需要设置好延迟时间。
         *      后者处理有序数据,不需要设置延迟时间。一般前者用得更多。
         *  watermark可以来一条数据生成一次(用得少),也可以按间隔来生成,默认是200ms生成一次。
         *  我们可以通过env.getConfig().setAutoWatermarkInterval(Time.seconds(200))来手动指定生成间隔。
         *
         */
        SingleOutputStreamOperator<SensorReading> waterStream = mapStream.assignTimestampsAndWatermarks(
                //watermark的延迟是10s
                new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(SensorReading sen) {
                        //提取字段中的事件时间。watermark时间单位是毫秒
                        return sen.getTimestamp() * 1000L;
                    }
                });
        /*mapStream.assignTimestampsAndWatermarks(
                new AscendingTimestampExtractor<SensorReading>() {
            @Override
            public long extractAscendingTimestamp(SensorReading sen) {

                return sen.getTimestamp()*1000L;
            }
        })*/

        /**
         * 开始计算,获取最近十五秒的最小温度记录,不包含watermark允许最大延迟1min
         *  超过最大延迟的数据加入侧输出流后再输出
         *
         */
        OutputTag<SensorReading> tag= new OutputTag<SensorReading>("late"); //封装延时数据
        SingleOutputStreamOperator<SensorReading> result = waterStream.keyBy(SensorReading::getId)
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1)) //watermark到达窗口结束时间后还允许的最大延迟
                .sideOutputLateData(tag)  //侧道输出机制
                .minBy(2);

        //输出并执行
        result.print("测试watermark操作");
        result.getSideOutput(tag).print("late");//打印侧输出流数据,这儿也可以对侧输出流做一些计算再输出
        env.execute("测试watermark操作");
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 15:27:43  更:2021-08-17 15:30:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:29:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码