IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 开发测试 -> Window ApplyProcess FunctionReduceFunctionAggregateFunction分析 -> 正文阅读

[开发测试]Window ApplyProcess FunctionReduceFunctionAggregateFunction分析

本文全部来源于官方文档解释。

Window Apply?#

WindowedStream → DataStream?#

AllWindowedStream → DataStream?#

Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.

解释:将一个通用的函数应用到整个窗口。可见apply用于窗口函数之后。

举例:

windowedStream.apply(new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});
// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

源码的角度分析:

接口函数windowFuntion中 定义了apply方法。

源码的注释为:计算窗口的值,并输出none或多个元素。 ?

总结:apply 是一个通用的函数应用于窗口中数据的计算,可以紧挨window? Funtion之后使用,另外在接口方法 WindowFunction中定义了apply方法,用户可以自定义apply对窗口中数据的处理规则。

Process Function?#

The ProcessFunction?#

ProcessFunction是一种底层流处理操作,可以访问所有(非循环)流应用程序的基本构建块:

  • 事件(流元素)
  • 状态(容错,一致,仅在键控流上)
  • 计时器(事件时间和处理时间,仅在键控流上)

可以将其ProcessFunction视为可以FlatMapFunction访问键控状态和计时器。它通过为输入流中接收到的每个事件调用来处理事件。

对于容错状态,ProcessFunction可以访问 Flink 的keyed state,可以通过 访问?RuntimeContext,类似于其他有状态函数访问 keyed state 的方式。

计时器允许应用程序对处理时间和事件时间的变化做出反应。对该函数的每次调用processElement(...)都会获得一个Context对象,该对象可以访问元素的事件时间时间戳和TimerService。可TimerService用于为将来的事件/处理时间瞬间注册回调。对于事件时间计时器,onTimer(...)当当前水印超过或超过计时器的时间戳时调用该方法,而对于处理时间计时器,onTimer(...)当挂钟时间达到指定时间时调用该方法。在该调用期间,所有状态再次限定为创建计时器的键,允许计时器操作键控状态。

如果要访问键控状态和计时器,则必须 ProcessFunction在键控流上应用:

他的具体介绍可以参照官网:Process Function | Apache Flinkicon-default.png?t=M1L8https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/process_function/

?总结:processFunction 是一个底层的处理流的函数:它能为用户提供三方面东西,1.元素2.状态3.计时器 可以让用户很方便的定义流数据的规则。一般我们通常使用ProcessFunction较多。

ReduceFunction?#

ReduceFunction指定输入中的两个元素如何组合生成相同类型的输出元素。?Flink使用ReduceFunction递增地聚合窗口的元素。 ?

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

AggregateFunction

AggregateFunction是ReduceFunction的一般化版本,

它有三种类型:

输入类型(IN)、

累加类型(ACC)

输出类型(OUT)。

?输入类型是输入流中元素的类型,AggregateFunction有一个将一个输入元素添加到累加器的方法。?该接口还提供了创建初始累加器、将两个累加器合并到一个累加器以及从累加器提取输出(类型为OUT)的方法。?我们将在下面的例子中看到它是如何工作的。 输入类型和输出类型可以不相同?

与ReduceFunction一样,Flink会在一个窗口的输入元素到达时递增地聚合它们。 ?

/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)

?

?

?

?

?

?

?

?

  开发测试 最新文章
pytest系列——allure之生成测试报告(Wind
某大厂软件测试岗一面笔试题+二面问答题面试
iperf 学习笔记
关于Python中使用selenium八大定位方法
【软件测试】为什么提升不了?8年测试总结再
软件测试复习
PHP笔记-Smarty模板引擎的使用
C++Test使用入门
【Java】单元测试
Net core 3.x 获取客户端地址
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:56:28  更:2022-03-10 22:56:45 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/18 2:51:49-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码