IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink源码解析:基于事件时间场景下WaterMark源码分析 -> 正文阅读

[大数据]Flink源码解析:基于事件时间场景下WaterMark源码分析

系列文章目录

Flink源码分析: 重启策略机制RestartStrategy

Flink源码分析: 广播状态流实现动态更新或字段参数变更

Flink源码分析: Flink JDBC Upsert模式实现原理

前言

这周比较忙,没时间写博客,趁着周五抓紧写一篇,保证业绩。努力的打工人

前两天在群里看到一个小伙伴说他在面试时被面试官问了一套Flink的WaterMark源码,被问了简直一脸懵逼,最后惨遭回去等通知吧....想想自从把公司生产线的Flink从1.10升级到1.12版本后对WaterMark的源码也没怎么深究过,主要是升级到新版本后一直在研究计算SQL话的问题,作为一个多年老司机,怎么可以会了SQL忘了代码的事呢,今天咱们就一起学习下WaterMark的源码吧,当然大家在学习前可以看下我之前写的一篇新版本的WaterMark原理:

https://mp.csdn.net/mp_blog/creation/editor/118314168


使用

Flink1.12版本中建议使用assignTimestampsAndWatermarks来抽象定义水印的生成:

dataStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(delayTime))

其中WatermarkStrategy继承了TimestampAssignerSupplier<T>接口和WatermarkGeneratorSupplier<T>接口。

先看下TimestampAssignerSupplier<T>的定义:

?创建一个类型为TimestampAssigner的对象,那这个对象有什么作用呢?我们再点进去看一下

?这个接口的作用主要是给每条数据分配一个时间戳,所以说当使用水印时继承了TimestampAssignerSupplier会分配一个时间戳。

WatermarkGeneratorSupplier<T>接口的定义:

onEvent: 每条消息都会调用该方法,传入的消息,消息携带的事件时间,和水印发射器。

onPeriodicEmit:每段时间都会调用该方法,入参是WatermarkOutputoutput,用户可以通过这个方法每隔一段时间发送一次水印,当记录数过多时,每条记录都发送一次水印明显不合适,也影响性能,此时可以通过这个方法进行水印的定时发送,而onEvent只记录当前水印而选择不发射出去。该方法的参数配置为env.getConfig().setAutoWatermarkInterval(300L),入参是毫秒数,表示隔多少毫秒向下游算子发送一次水印。

而WatermarkStrategy中也提供了一些常用的WatermarkGenerator<T>供用户使用,比如

BoundedOutOfOrdernessWatermarks<T>类中就是一个在onEvent中记录水印,通过onPeriodicEmit方法定时向下游发送水印的实现,构造参数maxOutOfOrderness是提供给记录乱序的,运行最大延迟间隔。MaxTimestamp是当前的水印记录。BoundedOutOfOrdernessWatermarks<T>的大致实现如下:

使用方法也十分的简单,提供的是一个静态方法,只需直接调用即可

WatermarkStrategy.<Map<String,Object>>forBoundedOutOfOrderness(Duration.ofSeconds(1))

最后结合项目,新版本的使用方法如下:

dataStream
    .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(delayTime))
      .withTimestampAssigner(new SerializableTimestampAssigner[String] {
        override def extractTimestamp(t: String, l: Long): Long = {
          val jSONObject = JSON.parseObject(t)
          jSONObject.getString("system_time").toLong
        }
      }))

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:44:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 23:49:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码