IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink中的时间语义和watermark -> 正文阅读

[大数据]Flink中的时间语义和watermark

先了解两个概念

首先要先知道时间语义和watermark 是什么

时间语义

image.png

在flink的数据处理流程中,有三个重要的时间概念,如上图所示分别是

  • Event Time:事件创建的时间(也就是数据生成的时间)
  • Ingestion Time:数据进入Flink的时间
  • Processing Time:对数据执行计算的时间,为本地系统时间与机器相关

在flink中有这三种时间语义,在默认情况下flink是按照Processing Time的时间来输出数据,但是往往由于分布式,并行度和网络拥挤的原因,流式数据传入到flink算子时已经发生了乱序,而按照乱序时Processing Time的时间进行输出,就与数据真实生成时的Event Time时间的顺序发生了改变。由此看来我们往往更关心事件时间Event Time
在处理时间流数据的时候,管道输出到算子往往并不是按照时间的顺序进行输出了,这样就产生了一个很严重的问题,在进行时间窗口下的计算时候,如果时间顺序靠后的数据比时间顺序靠前的数据先进入了窗口,那窗口是关闭还是不关闭?显然是必须关闭,但是这样的话这个窗口的数据计算就会由于某些数据的丢失而受到影响,从而使得窗口计算的不准确。
怎么避免乱序数据带来计算不正确?
下面引入水位线Watermark概念

水位线

  • Watermark是一种衡量 Event Time 进展的机制,通常运用在时间窗口的延迟触发
  • 当窗口遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等
    待一段时间(Watermark),等迟到的数据来了再关闭窗口
  • Watermark可以理解成一个延迟触发机制,通过设置Watermark的延时时间t,每次系统会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime-t的所有数据已经到达,如果有窗口的停止时间等于maxEventTime-t,那么这个窗口被触发执行。

水位线的特点

  • watermark 是一条特殊的数据记录
  • watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不
    是在后退
  • watermark 与数据的时间戳相关
  • watermark是让程序自己平衡延迟和结果正确性(值一半要反复测试)

举例说明

19BF9AB83ED3547824249F48BD3338D0.png

引入时间语义和watermark

引入时间语义

在 Flink 的流式处理中,绝大部分的业务使用到的时间语义都是eventTime,代码中引入,如下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/**
* 参数 TimeCharacteristic 有三种类型:
* ProcessingTime,
* IngestionTime,
* EventTime;
*/
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

引入watermark

引入watermark,实际上就是设置延迟时间和提取并设置 eventTime。最常用的是调用 assignTimestampAndWatermarks方法,传入延迟时间和重写extractTimestamp方法设置eventTime即可

要注意:eventTime的使用,一定要指定数据源中的时间戳,不然即使引入了eventTime时间语义,程序依旧会用Processing Time
下面还是以传感器为例

SingleOutputStreamOperator<SensorReading> watermarkDataStream = sensorDataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2L)) {
            @Override
            public long extractTimestamp(SensorReading sensorReading) {
                // 提取 eventTime 需要注意的是 该 eventTime 是毫秒单位,如果 sensorReading 的时间戳是以秒单位需要 乘以 1000
                return sensorReading.getTimestamp();
            }
        });

watermark的更新

关于watermark不妨想到一个问题
watermark什么时候更新呢? 是一来数据就更新,不来数据就不更新还是周期性的更新呢?
其实Flink 暴露了 TimestampAssigner 接口供我们实现,使我们可以自定义如
何从事件数据中抽取时间戳和生成watermark

dataStream.assignTimestampsAndWatermarks(new MyAssigner());

其中TimestampAssigner接口中有两个方法
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
分别对应着周期性更新Watermark和间断性更新Watermark

Assigner with periodic watermarks(周期性更新)

周期性的生成 watermark:系统会周期性的将 watermark 插入到流中(水位线也是一种特殊的事件!)。默认周期是 200 毫秒。可以使用
ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置 EventTime 特征
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 设置 Watermark 周期性更新
        env.getConfig().setAutoWatermarkInterval(500L);

当然也可以重写自定义,但是不提倡这么做,有很多细节不好把握(Flink中既然有现场的getCurrentWatermark()方法帮你封装好了所有细节只用改一下配置时间,这为什么不用呢?)

产生 watermark 的逻辑:每隔 5 秒钟,Flink 会调用
AssignerWithPeriodicWatermarks 的 getCurrentWatermark()方法。
如果方法返回一个时间戳大于之前水位的时间戳,新的 watermark 会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermar

Assigner with punctuated watermarks(间断性更新)

间断式地生成 watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理

这个方法是按照需求更新,只能自定义了

下面给出只考虑实现内容的自定义代码

案例

根据传感器id等于sensor_1的数据,才提取相对应的watermark,插入数据流中。

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;

public class Watermark_CustomPunctuatedWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 不再设置周期行性获取watermark
        // env.getConfig().setAutoWatermarkInterval(500L);
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.assignTimestampsAndWatermarks(new CustomPunctuatedWatermark());
        resultDataStream.print();
        env.execute();

    }

    public static class CustomPunctuatedWatermark implements AssignerWithPunctuatedWatermarks<SensorReading> {

        // 延迟 2s
        private long bound = 2 * 1000L;

        /**
         * @param lastElement      -> 上一条数据
         * @param extractTimestamp -> 当前数据的时间戳 根据 extractTimestamp 方法获取
         * @return
         */
        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(SensorReading lastElement, long extractTimestamp) {
            // 如果上一条数据的id 等于 sensor_1 则更新时间戳 否则返回 null
            if ("sensor_1".equals(lastElement.getId())) {
                return new Watermark(extractTimestamp - bound);
            } else {
                return null;
            }
        }

        /**
         * @param sensorReading            -> 当前数据
         * @param previousElementTimestamp -> 上一条数据的事件事件戳
         * @return
         */
        @Override
        public long extractTimestamp(SensorReading sensorReading, long previousElementTimestamp) {
            return sensorReading.getTimestamp();
        }
    }
}

watermark 的设定

  • 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的
    领域有一定的了解
  • 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办
    法是在水位线到达之前输出一个近似结果
  • 而如果watermark到达得太早,则可能收到错误结果,不过 Flink 处理迟
    到数据的机制可以解决这个问题

欢迎交流学习

个人博客

掘金主页

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-24 11:33:51  更:2021-07-24 11:35:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 7:30:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码