IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink水位线之watermark原理及实战 -> 正文阅读

[大数据]Flink水位线之watermark原理及实战

watermark概念

Flink 实际上是用 watermarks来实现 Event - Time 的功能。watermark在Flink中也属于特殊事件,其精髓在于当某个运算值收到 。带有时间戳“T”的watermarks时就意味着它不会接收到新的数据了。

使用watermark的好处在于可以准确预估收到数据的截止时间。举例,假设预期收到数据时间与输出结果时间的时间差延迟 5分钟,那么Flink 中所有的windows Operator 搜索 3点至4点的数据,但因为存在延迟需要再多等5 分钟直至收集完4:05分的数据,此时方能判定4点钟的资料收集完成了,然后才会产出3点至4点的数据结果。这个时间段的结果对应的就是watermarks 的部分。

watermark作用

? ? watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

? ? 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

? ? 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

watermark示

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //设置时间分配器

        env.setParallelism(1);  //设置并行度
        env.getConfig().setAutoWatermarkInterval(9000);//每9秒发出一个watermark

        DataStream<String> text = env.socketTextStream("localhost", 9900);

        DataStream<Tuple3<String, Long, Integer>> counts = text.filter(new FilterClass()).map(new LineSplitter())
                    .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
                    private long currentMaxTimestamp = 0l;
                    private final long maxOutOfOrderness = 10000l;   //这个控制失序已经延迟的度量
                    //获取EventTime
                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) {
                        long timestamp = element.f1;
                        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                        System.out.println(
                                "get timestamp is " + timestamp + " currentMaxTimestamp " + currentMaxTimestamp);
                        return timestamp;
                    }
                    //获取Watermark
                    @Override
                    public Watermark getCurrentWatermark() {
                        System.out.println("wall clock is " + System.currentTimeMillis() + " new watermark "
                                + (currentMaxTimestamp - maxOutOfOrderness));
                        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    }
                }).keyBy(0).timeWindow(Time.seconds(5))
                // .allowedLateness(Time.seconds(10))
                .sum(2);
        counts.print();
        env.execute("Window WordCount");


图片

图片,1625970416002(2021-07-11 10:26:56),1625970418002(2021-07-11?10:26:58),1625970419002(2021-07-11?10:26:59),1625970453002(2021-07-11 10:27:33)

我们水位线设置的9秒,当时间超过9秒时,那么会触发计算,从而会有下面的输出.触发计算的时间点是

  1. watermark超过了window的endtime.

  2. 在该window中有数据.

只有同时满足这两个条件,就会触发计算.

如果觉得文章能帮到您,欢迎关注微信公众号:“蓝天Java大数据” ,共同进步!

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-23 10:51:49  更:2021-07-23 10:52:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 13:22:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码