IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink多并行度下WaterMark的设计区别 -> 正文阅读

[大数据]Flink多并行度下WaterMark的设计区别

?问题的提出

? 对于WaterMark设计的位置是否会影响窗口的正常开闭?

??下面我模拟了两种情景(source并行度为1,map并行度为2),分别是
1.在source后设置watermark,经过map后开窗
2.在map后设置watermark,然后开窗

ps:? ? ? ?下面的两种代码我都设置了自然增长的watermark,窗口时间都是5秒,只是设置watermark的位置不同

????????????????watermark是testWM对象的ts字段*1000

?代码一:在Source后添加WaterMark

public class WMTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        // TODO: 2021/12/1 在source后设置watermark 
        SingleOutputStreamOperator<String> sourceWithWM = source.assignTimestampsAndWatermarks(WatermarkStrategy
                .<String>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                    @Override
                    public long extractTimestamp(String element, long recordTimestamp) {
                        String[] split = element.split(",");
                        return Long.parseLong(split[2]) * 1000;
                    }
                }));
        // TODO: 2021/12/1 设置map并行度为2 
        SingleOutputStreamOperator<testWM> mapDS = sourceWithWM.map(r -> {
            String[] split = r.split(",");
            return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
        }).setParallelism(2);
        SingleOutputStreamOperator<String> resultDS = mapDS.keyBy(r -> r.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {
                        out.collect("我关窗啦");
                    }
                });
        resultDS.print();
        env.execute();

    }
}
@Data
@AllArgsConstructor
class testWM{
    private int id;
    private int num;
    private long ts;
}

?代码二:在Map后设置WaterMark

public class WMTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
        // TODO: 2021/12/1 设置map并行度为2
        SingleOutputStreamOperator<testWM> mapDS = source.map(r -> {
            String[] split = r.split(",");
            return new testWM(Integer.parseInt(split[0]), Integer.parseInt(split[1]),Long.parseLong(split[2]));
        }).setParallelism(2);
        // TODO: 2021/12/1  在map后添加watermark
        SingleOutputStreamOperator<testWM> mapWithWM = mapDS.assignTimestampsAndWatermarks(WatermarkStrategy
                .<testWM>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<testWM>() {
                    @Override
                    public long extractTimestamp(testWM element, long recordTimestamp) {
                        return element.getTs() * 1000;
                    }
                }));
        SingleOutputStreamOperator<String> resultDS = mapWithWM.keyBy(r -> r.getId())
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<testWM, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<testWM> elements, Collector<String> out) throws Exception {
                        out.collect("我关窗啦");
                    }
                });
        resultDS.print();
        env.execute();

    }
}
@Data
@AllArgsConstructor
class testWM{
    private int id;
    private int num;
    private long ts;
}

运行结果:

对于第一种,在source后添加watermark的结果如下:

当1,1,1这条数据进入时,开启了[0,5)这个窗口,当1,1,9这条数据进入时,watermark升至9000(忽略watermark的减1).窗口关闭,没有问题

对于第二种,在map后添加watermark的结果如下:

?可以很明显的发现,当第一条1,1,9进入时,[0,5)这个窗口并没有关闭.直到第二条1,1,9进入时,窗口才被关闭,这是为什么?

我针对以上两种情况画了下图来理解.

?图一.图二描绘了source之后设置watermark的场景,一般来说,这是我们生产中需要的

?我在之前的文章中提到过,WaterMark以广播的形式向下游发送,并且如果同时接收上游多条并行度的WaterMark时,以小的为准

这就导致图三(Map后设置WaterMark)中,我需要发送两条足够[0,5)这个窗口关闭的数据,才能真正关闭窗口,因为数据要经过轮询才能
到达每个窗口.

拓展:

在KafkaSource中,已经做了很好得优化,在生产中我们一般设置并行度与topic分区数相同

如果设置得并行度比topic分区数多,那必然有并行度消费不到数据,就会导致WaterMark一直保持在Long.min_value.

当这种WaterMark向下游广播之后,会导致所有正常并行度的窗口全部无法关闭,因为WaterMark取了各个并行度的最小值

但是当这种状态保持一段时间之后,程序会在计算WaterMark的时候,自动过滤掉迟迟没有数据的并行度传进来的WaterMark,这就是KafkaSource的优化.

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-02 16:50:07  更:2021-12-02 16:50:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:51:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码