IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 游戏开发 -> Flink - Scala/Java trigger 简介与使用 -> 正文阅读

[游戏开发]Flink - Scala/Java trigger 简介与使用

一.引言

Flink 使用 windowAll 生成 AllwindowedStream 后调用 Trigger 执行窗口触发逻辑,下面对 Trigger 触发器做一个基本的了解。

二.Trigger 简介

Trigger 翻译为触发,扳机,其作用为在一定条件下触发窗口进行计算,如果是内部 operator 则执行对应 operator,如果自定义实现了 ProcessAllWindowFunction,则触发自定义执行逻辑。触发器决定窗口(由窗口赋值器形成)何时准备由窗口函数处理。每个WindowAssigner都有一个默认的触发器。如果默认触发器不符合您的需要,您可以使用trigger(…)指定一个自定义触发器。Trigger 多见于将大窗口的数据实时输出,例如针对 100s 的窗口,每 10s 触发一次数据执行窗口逻辑。

1.触发器方法

· onElement

public abstract TriggerResult onElement(T var1, long var2, W var4, Trigger.TriggerContext var5) throws Exception;

对于添加到窗口中的每个元素,都会调用onElement()方法。以最基础的 CountTrigger 为例,每当元素到达,对应 Trigger 类都会进行计数器累加和判断,如果到达数量累加至对应 count,则进行触发执行一次窗口逻辑。

· onEventTime

public abstract TriggerResult onEventTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

当注册的事件时间定时器触发时,调用onEventTime()方法。一般触发后会重置执行时间或注册下一次执行的 eventTime。

·?onProcessingTime

public abstract TriggerResult onProcessingTime(long var1, W var3, Trigger.TriggerContext var4) throws Exception;

当注册的处理时间计时器触发时,将调用onProcessingTime()方法。基本处理方式同上。

·?onMerge

public void onMerge(W window, Trigger.OnMergeContext ctx) throws Exception {
    throw new UnsupportedOperationException("This trigger does not support merging.");
}

onMerge()方法与有状态触发器相关,当它们对应的窗口合并时,可以合并两个触发器的状态,例如使用会话窗口时。两个窗口合并时,合并二者的状态值,可以看做是 reduce 函数,将 TimeWindow1 和 Timewindow2 的 state 变量合二为一。

·?clear

public abstract void clear(W var1, Trigger.TriggerContext var2) throws Exception;

最后,clear()方法执行删除相应窗口时所需的任何操作。以最基础的 CountTrigger 为例,clear 会清空计数器状态,即重新置为0。

· canMerge

    public boolean canMerge() {
        return false;
    }

trigger 是否支持 onMerge 方法合并二者状态。

2.触发器状态

onElement,onProcessTime,onEventTime 三个方法都会返回一个 TriggerResult,该类为枚举类,对应了执行方法后返回的窗口操作。

· TriggerResult.CONTINUE - 跳过,什么都不做

· TriggerResult.FIRE - 触发窗口计算

· TriggerResult.PURGE - 清除窗口元素

· TriggerResult.FIRE_AND_PURGE - 触发窗口操作,随后清空窗口元素

以 CountTrigger 为例,每攒够 Count 个元素,都会返回 TriggerResult.FIRE 执行窗口逻辑,而未够 Count 数量时则会 TiggerResult.CONTINUE。

3.Flink 自带 Trigger

?Flink?org.apache.flink.streaming.api.windowing.triggers 类下自带如下窗口触发器,如果需要自定义触发器则只需实现 Trigger 类的触发器方法,例如可以结合 CountTrigger 和 ProcessingTimeTrigger 实现基于处理条数和处理时间的双重触发器 CountAndProcessingTime Trigger。

ContinuousEventTimeTrigger连续事件时间触发器
ContinuousProcessingTimeTrigger连续处理时间触发器
CountTrigger计数触发器
DeltaTrigger阈值触发器
EventTimeTrigger事件时间触发器
ProcessingTimeoutTrigger处理时间超时触发器
ProcessingTimeTrigger处理时间触发器
PurgingTrigger强制 PURGE 触发器

三.API 示例

1.Scala 示例

下述示例将原始 DataStream 按10s进行滚动窗口聚合,其中 Trigger 设置为 CountTrigger,每满足 30 个元素进行一次触发。

    val allwindowedStream = dataStream
      .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .trigger(CountTrigger.of[TimeWindow](30L))
      .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
        override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
          val info = elements.toArray.mkString(",")
          out.collect(info)
        }
      }).setParallelism(1)
    allwindowedStream.print()

Tips:

Trigger 参数需要指定 implict T 即 of 后面的 [TimeWindow],这里如果添加对应数据的输出类型 T,则会报错?Required: Trigger[_ >: String,_ >: TimeWindow] :

Required: Trigger[_ >: String,_ >: TimeWindow]
Found: ContinuousProcessingTimeTrigger[String]

2.Java 示例

下述示例针对原始 DataStream 生成 10s 的滚动窗口,并且按连续处理时间每 5s 触发一次窗口的处理逻辑即 ProcessFunction。

       dataStream 
           .setParallelism(processParallel)
           .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
           .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
           .process(new ProcessFunction())
           .addSink(outputSink)
           .setParallelism(processParallel)
           .print()

四.总结与注意事项

1.默认触发器

基于 EventTime 的窗口默认使用 EventTimeTrigger,基于 ProcessTime 的窗口默认使用 ProcessingTimeTrigger

2.GlobalWindow

GlobalWindow的默认触发器是永不触发的NeverTrigger。因此,在使用GlobalWindow时,您总是必须定义一个自定义触发器。

3.窗口触发逻辑

一旦触发器确定窗口已经准备好进行处理,它就会触发,也就是说,它返回FIRE或FIRE_AND_PURGE。这是窗口操作符发出当前窗口结果的信号。给定一个带有ProcessWindowFunction的窗口,所有元素都被传递给ProcessWindowFunction。带有ReduceFunction或AggregateFunction的窗口只是发送它们的聚合结果。

4.FIRE AND PURGE

FIRE 触发窗口不清除窗口元素,PURGE 触发窗口但会清除窗口元素,进行自定义编辑时需要注意,避免窗口触发后损失一批数据,其次 PURGE 只会清除窗口的元素,窗口一些自定义的元数据和基本属性并不会清除。

  游戏开发 最新文章
6、英飞凌-AURIX-TC3XX: PWM实验之使用 GT
泛型自动装箱
CubeMax添加Rtthread操作系统 组件STM32F10
python多线程编程:如何优雅地关闭线程
数据类型隐式转换导致的阻塞
WebAPi实现多文件上传,并附带参数
from origin ‘null‘ has been blocked by
UE4 蓝图调用C++函数(附带项目工程)
Unity学习笔记(一)结构体的简单理解与应用
【Memory As a Programming Concept in C a
上一篇文章      下一篇文章      查看所有文章
加:2022-03-24 00:54:10  更:2022-03-24 00:54:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:40:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码