IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> (五)learning flink-1 -> 正文阅读

[大数据](五)learning flink-1

flink 官网

Flink概览

本文的重点是提供Flink用于管理状态和时间的api的简单介绍,希望掌握了这些基础知识后,您就能更好地从更详细的参考文档中获取所需的其他知识。每个部分末尾的链接将引导您到您可以学习更多的地方。

要注意以下几点:

  • 如何实现流数据处理管道
  • 如何以及为什么Flink管理状态
  • 如何使用事件时间来一致地计算准确的分析
  • 如何在连续流上构建事件驱动的应用程序
  • Flink如何能够提供容错,有状态的流处理恰好一次语义

本文侧重于四个关键概念:流数据的连续处理、事件时间、有状态流处理和状态快照。

流处理

流是数据的自然栖息地。无论是来自web服务器的事件,股票交易所的交易,还是来自工厂车间机器的传感器读数,数据都是作为流的一部分创建的。但是,当您分析数据时,您可以围绕有界或无界的流组织您的处理,您选择的这些范例将产生深远的影响。
在这里插入图片描述
批处理是处理有界数据流时的工作范式。在这种操作模式下,您可以选择在产生任何结果之前吸收整个数据集,这意味着,例如,可以对数据进行排序、计算全局统计数据或生成汇总所有输入的最终报告。

另一方面,流处理涉及到未绑定的数据流。至少从概念上讲,输入可能永远不会结束,因此必须在数据到达时不断地处理它。

在Flink中,应用程序由可由用户定义的操作符转换的流数据流组成。这些数据流形成有向图,以一个或多个源开始,以一个或多个汇聚结束。
在这里插入图片描述
通常,程序中的转换与数据流中的操作符之间存在一对一的对应关系。然而,有时一个转换可能包含多个操作符。

一个应用程序可以使用来自流媒体的实时数据,比如消息队列或分布式日志,比如Apache Kafka或Kinesis。但是flink也可以使用来自各种数据源的有限的历史数据。类似地,由Flink应用程序产生的结果流可以发送到各种系统,这些系统可以作为接收器连接。
在这里插入图片描述

并行数据流

Flink中的程序本质上是并行的和分布式的。在执行过程中,一个流有一个或多个流分区,每个操作符有一个或多个操作符子任务。操作符子任务彼此独立,在不同的线程中执行,也可能在不同的机器或容器上执行。

运算符子任务的数量就是该运算符的并行度。同一程序的不同运算符可能具有不同级别的并行性。
在这里插入图片描述
流可以在两个操作符之间以一对一(或转发)的模式传输数据,也可以采用重分发模式:

  • 一对一的流(例如上图中的Source和map()操作符之间)保留了元素的划分和顺序。这意味着map()操作符的子任务[1]将看到与Source操作符的子任务[1]产生的相同顺序的相同元素。
  • 重新分配流(如上面的map()和keyBy/window之间,以及keyBy/window和Sink之间)会改变流的分区。 每个运算符子任务将数据发送到不同的目标子任务,具体取决于所选的转换。 例如keyBy()(通过对键进行散列重新分区)、broadcast()或rebalance()(随机重新分区)。 在重分配交换中,元素之间的顺序只保留在每对发送和接收子任务中(例如,map()的子任务[1]和keyBy/window的子任务[2])。 因此,例如,上面所示的keyBy/window和Sink操作符之间的重新分配引入了关于不同键的聚合结果到达Sink的顺序的不确定性。

Timely Stream Processing

对于大多数流媒体应用程序来说,能够使用用于处理实时数据的相同代码重新处理历史数据并产生确定的、一致的结果是非常有价值的。

关注事件发生的顺序,而不是它们交付处理的顺序,以及能够推断一组事件何时完成(或应该何时完成),也可能是至关重要的。例如,考虑电子商务交易或金融交易中涉及的一组事件。

这些对及时流处理的要求可以通过使用记录在数据流中的事件时间时间戳来满足,而不是使用处理数据的机器的时钟。

Stateful Stream Processing

Flink的操作可以是有状态的。这意味着如何处理一个事件取决于在它之前发生的所有事件的累计效果。State可以用于简单的事情,比如在仪表板上显示每分钟的事件计数,也可以用于更复杂的事情,比如欺诈检测模型的计算特性。
Flink应用程序在分布式集群上并行运行。给定操作符的各种并行实例将在单独的线程中独立执行,通常将在不同的机器上运行。

有状态操作符的并行实例集实际上是一个分片的键值存储。每个并行实例负责处理特定键组的事件,这些键的状态保存在本地。

下图显示了一个作业在作业图中的前三个操作符上以并行度为2的方式运行,并在并行度为1的接收中终止。第三个操作符是有状态的,可以看到在第二个和第三个操作符之间发生了完全连接的网络shuffle。这样做是为了按某个键划分流,以便所有需要一起处理的事件都将被处理。
在这里插入图片描述
状态总是在本地访问,这有助于Flink应用程序实现高吞吐量和低延迟。您可以选择将状态保存在JVM堆上,或者如果状态太大,则将其保存在有效组织的磁盘数据结构中。
在这里插入图片描述

通过状态快照实现容错

Flink能够通过状态快照和流重放的组合提供容错、恰好一次的语义。这些快照捕获分布式管道的整个状态,将偏移量记录到输入队列中,以及在整个作业图中由于接收到该点的数据而产生的状态。当发生故障时,源将被重绕,状态将恢复,处理将恢复。如上所述,这些状态快照是异步捕获的,不会妨碍正在进行的处理。

DataStream API

怎么能流化呢?

Flink的面向Java和Scala的DataStream api可以让你流任何可以序列化的东西。使用Flink自己的序列化器:

  • 基本类型,例如String, Long, Integer, Boolean;
  • Array复合类型:元组,pojo和Scala case类

Flink对于其他类型回调到Kryo。在Flink中也可以使用其他序列化器。特别是Avro,它得到了很好的支持。

Java tuples and POJOs

Flink的native序列化器可以在元组和pojo上高效地操作。

Tuples

java api ,tuple定义了Tuple0 到Tuple25个类型,有自定义实现接口,可自定义

Tuple2<String, Integer> person = Tuple2.of("Fred", 35);

// zero based index!  
String name = person.f0;
Integer age = person.f1;

POJOs

如果满足以下条件,Flink将数据类型识别为POJO类型(并允许“by-name”字段引用):

  • 该类是公共的独立类(没有非静态内部类)
  • 该类有一个公共的无参数构造函数
  • 类(以及所有超类)中的所有非静态、非瞬态字段要么是公共的(非final的),要么具有公共的getter和setter方法,这些方法遵循Java bean对getter和setter的命名约定。

举例:

public class Person {
    public String name;  
    public Integer age;  
    public Person() {}
    public Person(String name, Integer age) {  
        . . .
    }
}  

Person person = new Person("Fred Flintstone", 35);

Scala tuples and case classes

这些工作正如您所期望的那样。

举一个栗子

这个示例将关于人的记录流作为输入,并过滤它以只包括成年人。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;

public class Example {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
                new Person("Pebbles", 2));

        DataStream<Person> adults = flintstones.filter(new FilterFunction<Person>() {
            @Override
            public boolean filter(Person person) throws Exception {
                return person.age >= 18;
            }
        });

        adults.print();

        env.execute();
    }

    public static class Person {
        public String name;
        public Integer age;
        public Person() {}

        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        }
    }
}

Stream 执行环境

每个Flink应用程序都需要一个执行环境,本例中为env。流应用程序需要使用StreamExecutionEnvironment。

在应用程序中进行的DataStream API调用将构建一个附加到StreamExecutionEnvironment的作业图。当调用env.execute()时,这个图被打包并发送到JobManager, JobManager将作业并行化,并将其分片分发给任务管理器执行。作业的每个并行部分将在一个任务槽中执行。

注意,如果不调用execute(),应用程序将无法运行。
在这里插入图片描述
这个分布式运行时取决于您的应用程序是可序列化的。它还要求集群中的每个节点都可以使用所有依赖项。

basic stream sources

上面的例子使用env.fromElements(…)构造了一个DataStream。这是在原型或测试中拼凑一个简单流的方便方法。在StreamExecutionEnvironment上还有一个fromCollection(Collection)方法。所以,你可以这样做:

List<Person> people = new ArrayList<Person>();

people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));

DataStream<Person> flintstones = env.fromCollection(people);

在原型设计时,另一种方便的方法是使用套接字:

DataStream<String> lines = env.socketTextStream("localhost", 9999);

或者读文件:

DataStream<String> lines = env.readTextFile("file:///path");

在实际应用程序中,最常用的数据源是那些支持低延迟、高吞吐量的并行读取,并结合了倒带和重放(高性能和容错的先决条件)的数据源,例如Apache Kafka、Kinesis和各种文件系统。REST api和数据库也经常用于流的充实。

Basic stream sinks

上面的例子使用adult .print()将其结果打印到任务管理器日志中(当在IDE中运行时,它将出现在IDE的控制台中)。这将在流的每个元素上调用toString()。

类似于这样子的结果:

1> Fred: age 35
2> Wilma: age 35

其中1>和2>表示哪个子任务(即线程)产生了输出。
在生产环境中,常用的接收器包括StreamingFileSink、各种数据库和pub-sub-子系统。

Debugging

在生产环境中,您的应用程序将运行在远程集群或一组容器中。 如果失败,它会远程失败。 JobManager和TaskManager日志在调试这类故障时非常有用,但是在IDE中进行本地调试要容易得多,这也是Flink所支持的。 您可以设置断点、检查局部变量和逐步执行代码。 您还可以进入Flink的代码,如果您对Flink的工作原理感到好奇,这是了解更多关于其内部原理的好方法。

亲自动手

至此,您已经足够了解如何开始编写和运行一个简单的DataStream应用程序。克隆flink-training-repo,按照README中的说明进行第一个练习:筛选流

数据管道& ETL

Apache Flink的一个非常常见的用例是实现ETL(提取、转换、加载)管道,该管道从一个或多个源获取数据,执行一些转换和/或充实,然后将结果存储在某处。在本节中,我们将看看如何使用Flink的DataStream API来实现这类应用程序。

请注意,Flink的Table和SQL api非常适合许多ETL用例。但是,无论您最终是否直接使用DataStream API,对本文介绍的基础知识有一个坚实的理解都是有价值的。

无状态转换算子

本节介绍map()和flatmap(),它们是用于实现无状态转换的基本操作。本节中的示例假设您熟悉在flink-training-repo的实际操作练习中使用的Taxi Ride数据。

map()

在第一个练习中,您过滤了出租车乘坐事件流。在相同的代码基中,有一个GeoUtils类,它提供一个静态方法GeoUtils。mapToGridCell(float lon, float lat),它将一个位置(经度,纬度)映射到一个网格单元格,该网格单元格指的是一个大小约为100x100米的区域。

public static class EnrichedRide extends TaxiRide {
    public int startCell;
    public int endCell;

    public EnrichedRide() {}

    public EnrichedRide(TaxiRide ride) {
        this.rideId = ride.rideId;
        this.isStart = ride.isStart;
        ...
        this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
        this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
    }

    public String toString() {
        return super.toString() + "," +
            Integer.toString(this.startCell) + "," +
            Integer.toString(this.endCell);
    }
}

然后,您可以创建一个转换流的应用程序

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .filter(new RideCleansingSolution.NYCFilter())
    .map(new Enrichment());

enrichedNYCRides.print();

Enrichment.java实现MapFunction:

public static class Enrichment implements MapFunction<TaxiRide, EnrichedRide> {

    @Override
    public EnrichedRide map(TaxiRide taxiRide) throws Exception {
        return new EnrichedRide(taxiRide);
    }
}

flatmap()

MapFunction仅适用于执行一对一转换:对于每个传入的流元素,map()将发出一个转换后的元素。否则,您将需要使用flatmap()

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));

DataStream<EnrichedRide> enrichedNYCRides = rides
    .flatMap(new NYCEnrichment());

enrichedNYCRides.print();

NYCEnrichment实现 FlatMapFunction:

public static class NYCEnrichment implements FlatMapFunction<TaxiRide, EnrichedRide> {

    @Override
    public void flatMap(TaxiRide taxiRide, Collector<EnrichedRide> out) throws Exception {
        FilterFunction<TaxiRide> valid = new RideCleansing.NYCFilter();
        if (valid.filter(taxiRide)) {
            out.collect(new EnrichedRide(taxiRide));
        }
    }
}

有了这个接口中提供的Collector, flatmap()方法可以发出任意多的流元素,甚至不包含任何元素。

Keyed Streams

keyBy()

能够围绕流的一个属性划分流通常是非常有用的,这样具有该属性相同值的所有事件都被分组在一起。例如,假设您想要在每个网格单元中找到最长的出租车车程。从SQL查询的角度考虑,这意味着在startCell中执行某种GROUP BY,而在Flink中这是通过keyBy(KeySelector)完成的。

rides
    .flatMap(new NYCEnrichment())
    .keyBy(enrichedRide -> enrichedRide.startCell);

每个keyBy都会导致网络shuffle,重新划分流。通常,这是相当昂贵的,因为它涉及到网络通信以及序列化和反序列化。
在这里插入图片描述

键选择器并不局限于从事件中提取键。相反,它们可以以任何您想要的方式计算键,只要结果键是确定性的,并且具有hashCode()和equals()的有效实现。这一限制排除了生成随机数或返回数组或枚举的KeySelectors,但是你可以使用tuple或pojo拥有复合键,例如,只要它们的元素遵循这些规则。

键必须以确定的方式生成,因为它们在需要的时候会重新计算,而不是附加到流记录。

例如,我们不创建一个新的带有startCell字段的EnrichedRide类,然后将其用作key 如下表示:

keyBy(enrichedRide -> enrichedRide.startCell);

我们可以这样做:

keyBy(ride -> GeoUtils.mapToGridCell(ride.startLon, ride.startLat));

Aggregations on Keyed Streams

这段代码创建了一个新的元组流,包含startCell和每个结束事件的持续时间(以分钟为单位):

import org.joda.time.Interval;

DataStream<Tuple2<Integer, Minutes>> minutesByStartCell = enrichedNYCRides
    .flatMap(new FlatMapFunction<EnrichedRide, Tuple2<Integer, Minutes>>() {

        @Override
        public void flatMap(EnrichedRide ride,
                            Collector<Tuple2<Integer, Minutes>> out) throws Exception {
            if (!ride.isStart) {
                Interval rideInterval = new Interval(ride.startTime, ride.endTime);
                Minutes duration = rideInterval.toDuration().toStandardMinutes();
                out.collect(new Tuple2<>(ride.startCell, duration));
            }
        }
    });

现在可以生成一个流,它只包含那些为每个startCell所见过(到那时为止)最长的骑乘。

字段作为键的表达方式有很多种。前面您看到了一个EnrichedRide POJO的示例,其中用作键的字段是用它的名称指定的。这种情况涉及Tuple2对象,元组中的索引(从0开始)用于指定键。

minutesByStartCell
  .keyBy(value -> value.f0) // .keyBy(value -> value.startCell)
  .maxBy(1) // duration
  .print();

现在,每当持续时间达到新的最大值时,输出流包含每个键的一条记录-如单元格50797所示:

...
4> (64549,5M)
4> (46298,18M)
1> (51549,14M)
1> (53043,13M)
1> (56031,22M)
1> (50797,6M)
...
1> (50797,8M)
...
1> (50797,11M)
...
1> (50797,12M)

隐式状态

这是本文中涉及有状态流的第一个例子。 虽然状态是透明处理的,但Flink必须跟踪每个不同密钥的最大持续时间。

每当应用程序涉及到状态时,您都应该考虑状态可能会变得多大。 只要键空间是无界的,那么Flink需要的状态量也是无界的。

当处理流时,通常更有意义的是考虑有限窗口上的聚合,而不是整个流。

reduce() and other aggregators

上面使用的maxBy()只是Flink的KeyedStreams上可用的许多聚合器函数的一个例子。 还有一个更通用的reduce()函数,您可以使用它来实现自己的自定义聚合。

有状态算子

为什么Flink参与状态管理?

你的应用程序当然可以在不需要Flink管理的情况下使用状态,但是Flink为它管理的状态提供了一些引人注目的特性:

  • 本地化: Flink状态保持在处理它的机器的本地,可以以内存速度访问
  • 持久化:Flink状态具有容错性,即定时自动检查点,故障时恢复
  • 垂直扩展:Flink状态可以保存在嵌入的RocksDB实例中,通过添加更多本地磁盘进行扩展
  • 水平扩展:Flink状态会随着集群的增长和收缩而重新分布
  • 可查询的:可以通过queryable state API从外部查询Flink状态。

在本节中,您将学习如何使用管理键控状态的Flink api。

富函数

到目前为止,您已经看到了Flink的几个函数接口,包括FilterFunction、MapFunction和FlatMapFunction。这些都是单一抽象方法模式的例子。

对于这些接口,Flink还提供了一个所谓的“富”变体,例如RichFlatMapFunction,它有一些额外的方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

Open()在操作符初始化期间调用一次。这是加载一些静态数据或打开到外部服务的连接的机会。

getRuntimeContext()提供了对一套可能有趣的东西的访问,但最值得注意的是如何创建和访问由Flink管理的状态。

举一个Keyed State栗子

在本例中,假设您有一个想要重复的事件流,因此您只保留每个键的第一个事件。这是一个应用程序,使用RichFlatMapFunction称为Deduplicator:

private static class Event {
    public final String key;
    public final long timestamp;
    ...
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  
    env.addSource(new EventSource())
        .keyBy(e -> e.key)
        .flatMap(new Deduplicator())
        .print();
  
    env.execute();
}

要做到这一点,重复数据删除器需要以某种方式记住每个键是否已经发生了该键的事件。它将使用Flink的键控状态接口来实现这一点。

当您使用这样的键控流时,Flink将为所管理的每个状态项维护一个键/值存储。

Flink支持几种不同类型的键控状态,本例使用了最简单的一种,即ValueState。这意味着对于每个键,Flink将存储单个对象—在本例中是一个布尔类型的对象。

我们的Deduplicator类有两个方法:open()和flatMap()。open方法通过定义ValueStateDescriptor来建立托管状态的使用。构造函数的参数为键状态项指定一个名称(" keyHasBeenSeen "),并提供可用于序列化这些对象的信息(在本例中是Types.BOOLEAN)。

public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
    ValueState<Boolean> keyHasBeenSeen;

    @Override
    public void open(Configuration conf) {
        ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
        keyHasBeenSeen = getRuntimeContext().getState(desc);
    }

    @Override
    public void flatMap(Event event, Collector<Event> out) throws Exception {
        if (keyHasBeenSeen.value() == null) {
            out.collect(event);
            keyHasBeenSeen.update(true);
        }
    }
}

当flatMap方法调用keyHasBeenSeen.value()时,Flink的运行时在上下文中查找这个键的状态值,只有当它为null时,它才会继续收集事件到输出。在本例中,它还更新keyHasBeenSeen为true。

这种访问和更新键分区状态的机制可能看起来相当神奇,因为在我们的重复数据删除器的实现中,键不是显式可见的。当Flink的运行时调用RichFlatMapFunction的open方法时,没有事件,因此此时上下文中没有键。但是当它调用flatMap方法时,正在处理的事件的键对运行时可用,并在幕后用于确定正在操作Flink的状态后端中的哪个条目。

当部署到分布式集群时,这个重复数据删除器将有许多实例,每个实例将负责整个密钥空间的一个不相交的子集。因此,当您看到ValueState的单个项时,例如

ValueState <Boolean> keyHasBeenSeen;

理解这不仅仅代表一个布尔值,而是一个分布式的、分片的、键/值存储。

Clearing State

上面的例子有一个潜在的问题:如果键空间是无界的,会发生什么?Flink为每个使用的键存储一个布尔值实例。如果有一个有界的键集,那么这是可以的,但在键集以无界的方式增长的应用程序中,有必要清除不再需要的键的状态。这可以通过在状态对象上调用clear()来实现,如下所示:

keyHasBeenSeen.clear();

例如,您可能希望在给定键一段时间不活动之后执行此操作。当您在事件驱动的应用程序一节中学习processfunction时,您将看到如何使用计时器来完成此任务。

还有一个State Time-to-Live (TTL)选项,您可以使用状态描述符配置它,以指定您希望何时自动清除失效键的状态。

Non-keyed State

也可以在非键控上下文中使用托管状态。这有时被称为算子态。所涉及的接口有些不同,因为用户定义函数不太可能需要非键状态,所以这里不介绍。这个特性最常用于sources and sinks的实现。

Connected Streams

有时不像这样应用预定义的转换:
在这里插入图片描述
您希望能够动态地改变转换的某些方面—通过输入阈值、规则或其他参数。Flink中支持这个的模式叫做连接流,其中一个操作符有两个输入流,像这样:
在这里插入图片描述
connect steam 也可以用来实现流连接。

举一个栗子

在这个例子中,一个控制流被用来指定必须从streamOfWords中过滤出来的单词。一个叫做ControlFunction的RichCoFlatMapFunction被应用到连接的流来完成这个任务。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<String> control = env
        .fromElements("DROP", "IGNORE")
        .keyBy(x -> x);

    DataStream<String> streamOfWords = env
        .fromElements("Apache", "DROP", "Flink", "IGNORE")
        .keyBy(x -> x);
  
    control
        .connect(streamOfWords)
        .flatMap(new ControlFunction())
        .print();

    env.execute();
}

注意,连接的两个流必须以兼容的方式进行键控。keyBy的作用是对流的数据进行分区,当连接有键控的流时,它们必须以同样的方式进行分区。这确保了具有相同键的两个流中的所有事件都被发送到相同的实例。例如,这使得连接该键上的两个流成为可能。

在本例中,两个流都是DataStream类型,并且两个流都由字符串进行键控。正如你将在下面看到的,这个RichCoFlatMapFunction在键状态下存储一个布尔值,这个布尔值由两个流共享。

public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
    private ValueState<Boolean> blocked;
      
    @Override
    public void open(Configuration config) {
        blocked = getRuntimeContext()
            .getState(new ValueStateDescriptor<>("blocked", Boolean.class));
    }
      
    @Override
    public void flatMap1(String control_value, Collector<String> out) throws Exception {
        blocked.update(Boolean.TRUE);
    }
      
    @Override
    public void flatMap2(String data_value, Collector<String> out) throws Exception {
        if (blocked.value() == null) {
            out.collect(data_value);
        }
    }
}

RichCoFlatMapFunction是一种FlatMapFunction,可以应用于一对连接的流,它可以访问富函数接口。这意味着它可以是有状态的。

被阻塞的布尔值被用来记住控制流中提到的关键字(在本例中是单词),并且这些单词被从streamOfWords流中过滤出来。这是键状态,它在两个流之间共享,这就是为什么两个流必须共享相同的键空间。

Flink运行时使用来自两个连接流的元素调用flatMap1和flatMap2—在我们的示例中,来自控制流的元素被传递给flatMap1,来自streamOfWords的元素被传递给flatMap2。这是由control.connect(streamOfWords)连接两个流的顺序决定的。

重要的是要认识到您无法控制flatMap1和flatMap2回调的调用顺序。这两个输入流相互竞争,Flink运行时将按照它想要的方式从一个流或另一个流中消费事件。在时间和/或顺序问题的情况下,您可能会发现有必要在托管的Flink状态中缓冲事件,直到应用程序准备好处理它们。(注意:如果你真的很绝望,可以对双输入操作符使用的顺序施加一些有限的控制)。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 19:02:54  更:2022-05-21 19:03:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 19:50:50-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码