IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【Flink】WaterMark与EventTime -> 正文阅读

[大数据]【Flink】WaterMark与EventTime

【Flink重难点辨析】WaterMark与EventTime

WaterMark

Flink中测量事件时间的进度的机制就是watermark(水印). watermark作为数据流的一部分在流动, 并且携带一个时间戳t。
一个Watermark(t)表示在这个流里面事件时间已经到了时间t, 意味着此时, 流中不应该存在这样的数据: 他的时间戳t2<=t (时间比较旧或者等于时间戳)。
1、衡量事件时间的进展
2、是一个特殊的时间戳,生成之后随着流的流动而向后传递(广播方式)
3、用来处理数据乱序的问题。
4、触发窗口等的计算、关闭
5、单调递增的(时间不能倒退),与key无关。
6、Flink认为,小于Watermark时间戳的数据处理完了,不应该再出现

有序流中的水印

事件是有序的(生成数据的时间和被处理的时间顺序是一致的), watermark是流中一个简单的周期性的标记。
在这里插入图片描述

有序场景:
1、底层调用的也是乱序的Watermark生成器,只是乱序程度传了一个0ms。
2Watermark = maxTimestamp – outOfOrdernessMills – 1ms
			 = maxTimestamp – 0ms – 1ms
			 =>事件时间 – 1ms

乱序流中的水印

按照时间戳来看, 事件是乱序的, 则watermark对于这些乱序的流来说至关重要
在这里插入图片描述

乱序场景:
1、什么是乱序 => 时间戳大的比时间戳小的先来
2、乱序程度设置多少比较合适?   
a)经验值 => 对自身集群和数据的了解,大概估算。
b)对数据进行抽样。  --->抽样,算最大乱序程度
c)肯定不会设置为几小时,一般设为 秒 或者 分钟。
3Watermark = maxTimestamp – outOfOrdernessMills – 1ms
             =>当前最大的事件时间 – 乱序程度(等待时间)- 1ms 

EventTime

事件时间是指的这个事件发生的时间.
事件时间程序必须制定如何产生Event Time Watermarks(水印) . 在事件时间体系中, 水印是表示时间进度的标志(作用就相当于现实时间的时钟).
注意:在Flink1.12之前默认的时间语义是处理时间, 从1.12开始, Flink内部已经把默认的语义改成了事件时间

EventTime和WaterMark的使用

1.Monotonously Increasing Timestamps(时间戳单调增长:允许的延迟为0) —>有序

//分配时间戳和水印
waterSensorDStream.assignTimestampsAndWatermarks(WatermarkStrategy.
		//水印策略:时间戳单调增长
		<WaterSensor>forMonotonousTimestamps()
		//指定时间戳
		.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { 
              @Override 
              public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                  return element.getTs() * 1000;
              }
          })
); 

2.Fixed Amount of Lateness(允许固定时间的延迟) —>乱序

//分配时间戳和水印
waterSensorDStream.assignTimestampsAndWatermarks(WatermarkStrategy.
		//水印策略:固定时间的延迟
		.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) //最大容忍的延迟时间
		//指定时间戳
		.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() { 
              @Override 
              public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                  return element.getTs() * 1000;
              }
          })
); 

多并行度下WaterMark的传递

多并行度下WaterMark生成位置与传递测试

测试声明:全局并行度2
测试流程:source —> map —> keyby
1.测试数据准备

1001,1,12
1002,6,23

2.测试用JavaBean准备:WaterSensor

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WaterSensor {
    private String id;
    private Long ts;
    private Double vc;
}

3.测试程序1:MaxVC1 source(WaterMark) —> map —> keyby

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class MaxVC1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        
        DataStreamSource<String> dataStreamSource = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator<String> outputStreamOperator = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<String>() {
            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                String[] fields = element.split(",");
                return Long.parseLong(fields[1]) * 1000L;
            }
        }));

        SingleOutputStreamOperator<WaterSensor> waterSensorDS = outputStreamOperator.map(line -> {
            String[] fields = line.split(",");
            return new WaterSensor(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
        });
        SingleOutputStreamOperator<WaterSensor> result = waterSensorDS.keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        return new WaterSensor(value1.getId(), value1.getTs(), Math.max(value1.getVc(), value2.getVc()));
                    }
                });

        result.print();
        env.execute();
    }
}
输入控制台打印输出
1001,1,12
1002,6,231> WaterSensor(id=1001, ts=1, vc=12.0)

在这里插入图片描述
4.测试程序2:MaxVC2 source —> map(WaterMark) —> keyby

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class MaxVC2 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource<String> dataStreamSource = env.socketTextStream("hadoop102", 9999);
        
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = dataStreamSource.map(line -> {
            String[] fields = line.split(",");
            return new WaterSensor(fields[0], Long.parseLong(fields[1]), Double.parseDouble(fields[2]));
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
            @Override
            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                return element.getTs() * 1000L;
            }
        }));

        SingleOutputStreamOperator<WaterSensor> result = waterSensorDS.keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        return new WaterSensor(value1.getId(), value1.getTs(), Math.max(value1.getVc(), value2.getVc()));
                    }
                });
        result.print();
        env.execute();
    }
}
输入控制台打印输出
1001,1,12
1002,6,23
1002,6,231> WaterSensor(id=1001, ts=1, vc=12.0)

在这里插入图片描述

多并行度下WaterMark传递总结:

1.多并行度的条件下, 向下游传递WaterMark的时候是以广播的方式传递的
2.总是以最小的那个WaterMark为准! 木桶原理!
3.并且当watermark值没有增长的时候不会向下游传递,注意:生成不变。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-28 11:22:04  更:2021-11-28 11:22:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:04:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码