IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink 窗口分配器 时间窗口 -> 正文阅读

[大数据]Flink 窗口分配器 时间窗口

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个 WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调 用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。

1. 时间窗口

????????时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。?
????????标准的声明方式就是直接调用.window(),在里面传 入对应时间语义下的窗口分配器。我们不需要专门定义时间语义,默认就是事件时 间;如果想用处理时间,那么在这里传入处理时间的窗口分配器就可以了。?

(1)滚动处理时间窗口

窗口分配器由类 TumblingProcessingTimeWindows 提供,需要调用它的静态方法.of()。
?

stream.keyBy(...)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(...)

????????这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建 了一个长度为 5 秒的滚动窗口。?
????????另外,.of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。第一个参 数当然还是窗口大小,第二个参数则表示窗口起始点的偏移量。这里需要多做一些解释:对于 我们之前的定义,滚动窗口其实只有一个 size 是不能唯一确定的。比如我们定义 1 天的滚动 窗口,从每天的 0 点开始计时是可以的,统计的就是一个自然日的所有数据;而如果从每天的 凌晨 2 点开始计时其实也完全没问题,只不过统计的数据变成了每天 2 点到第二天 2 点。这个 起始点的选取,其实对窗口本身的类型没有影响;而为了方便应用,默认的起始点时间戳是窗 口大小的整倍数。也就是说,如果我们定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时 的窗口,默认就从整点开始。而如果我们非要不从这个默认值开始,那就可以通过设置偏移量 offset 来调整。

(2)滑动处理时间窗口

????????窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

stream.keyBy(...)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

????????这里.of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小, 后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗 口。
????????滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量,用法与滚动窗口完全 一致。

(3)处理时间会话窗口

????????窗口分配器由类 ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。

stream.keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

????????这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最 小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

.window(ProcessingTimeSessionWindows.withDynamicGap(new
SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
	@Override
	public long extract(Tuple2<String, Long> element) {
		// 提取 session gap 值返回, 单位毫秒
		return element.f0.length() * 1000;
	}
}))

????????这里.withDynamicGap()方法需要传入一个 SessionWindowTimeGapExtractor 作为参数,用 来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长 度乘以 1000 作为会话超时的间隔。?

(4)滚动事件时间窗口

????????窗口分配器由类 TumblingEventTimeWindows 提供,用法与滚动处理事件窗口完全一致。

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(...)

????????这里.of()方法也可以传入第二个参数 offset,用于设置窗口起始点的偏移量。

(5)滑动事件时间窗口

????????窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

(6)事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-07 11:15:14  更:2022-05-07 11:16:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 7:54:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码