IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink DataStream API(五)用户自定义函数 -> 正文阅读

[大数据]flink DataStream API(五)用户自定义函数

用户自定义函数

大多数操作都需要用户自定义函数。我们还介绍了Accumulators,可用于深入了解您的 Flink 应用程序。

实现一个接口

最基本的方法之一是实现提供的接口:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};
data.map(new MyMapFunction());

匿名类

您可以将函数作为匿名类传递:

data.map(new MapFunction<String, Integer> () {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Java 8 Lambdas

Flink 在 Java API 中也支持 Java 8 Lambdas。

data.filter(s -> s.startsWith("http://"));
data.reduce((i1,i2) -> i1 + i2);

Rich functions

所有需要用户定义函数的转换都可以将Rich functions作为参数,而不是以下形式:

class MyMapFunction implements MapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};

你可以这样写:

class MyMapFunction extends RichMapFunction<String, Integer> {
  public Integer map(String value) { return Integer.parseInt(value); }
};

并像往常一样将函数传递给map转换:

data.map(new MyMapFunction());

Rich functions也可以定义为匿名类:

data.map (new RichMapFunction<String, Integer>() {
  public Integer map(String value) { return Integer.parseInt(value); }
});

Rich functions函数除了提供用户自定义函数(map、reduce等)之外,还提供了四种方法:open、close、getRuntimeContext、setRuntimeContext。它们对于参数化函数(请参见将参数传递给函数)、创建和最终本地状态、访问广播变量(请参见 Broadcast Variables)以及访问运行时信息(如Accumulators and Counters)和迭代信息(Iterations)非常有用。

累加器和计数器

累加器是带有加法运算和最终累加结果的简单结构,可在作业结束后使用。

最直接的累加器是计数器:您可以使用 Accumulator.add(V value) 方法向累加器中添加元素,在作业结束时,Flink 会将所有元素汇总(合并)并将结果发送给客户端。累加器在调试期间或您如果想快速了解有关数据的更多信息时很有用。

Flink 目前内置了以下累加器。它们中的每一个都实现了 Accumulator 接口。

  • IntCounterLongCounterDoubleCounter :有关使用累加器的示例,请参见下文。
  • Histogram:离散数量的容器的直方图实现。在内部,它只是一个从Integer到Integer的映射。您可以使用它来计算值的分布,例如字计数程序的每行字的分布。

如何使用累加器:

首先,您必须在要使用它的用户定义的转换函数中创建一个累加器对象(这里是一个计数器)。

private IntCounter numLines = new IntCounter();

其次,您必须注册累加器对象,通常在Rich functionopen() 方法中。您还可以在此处定义名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

您现在可以在算子函数的任何位置使用累加器,包括在 open()close() 方法中。

this.numLines.add(1);

最终结果将存储在执行环境的execute()方法返回的JobExecutionResult对象中(目前只有在执行等待作业完成时才有效)。

myJobExecutionResult.getAccumulatorResult("num-lines")

所有累加器在每个作业中共享一个名称空间。因此,您可以在作业的不同算子函数中使用相同名称的累加器。Flink将在内部合并所有具有相同名称的累加器。

关于累加器和迭代的注意事项:目前,累加器的结果只有在整个工作结束后才可用。我们还计划在下一次迭代中提供上一次迭代的结果。您可以使用聚合器来计算每次迭代的统计数据,并根据此类统计数据决定迭代的终止。

自定义累加器

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-21 15:32:29  更:2021-08-21 15:32:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:09:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码