前言
????????Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
一、流、批处理
????????有界流:有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。
????????无界流:有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。
????????流处理:无界流通常被称为流处理。当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。数据是一条一条的处理,实时性高。
????????批处理:有界流处理通常被称为批处理。当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的一批数据通过网络传输到下一个节点。数据是一批一批处理,实时性低。
二、hello world
1.环境
jdk11 + flink1.14.4
引入flink 依赖
<!-- https:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.14.4</version>
</dependency>
<!-- https:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.14.4</version>
</dependency>
<!-- https:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.4</version>
</dependency>
2.流处理
@Test
public void streamingTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.fromCollection(List.of("nacos,python,java", "nacos,scripts,php", "nacos,java,springmvc", "nacos,sentinel,gateway"))
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
Arrays.stream(value.split(",")).forEach(v -> out.collect(v));
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
.sum(1)
.print();
env.execute("flink streaming hello word");
}
执行结果:
6> (springmvc,1)
3> (python,1)
2> (java,1)
2> (java,2)
1> (scripts,1)
4> (php,1)
8> (nacos,1)
8> (nacos,2)
8> (nacos,3)
8> (nacos,4)
8> (sentinel,1)
8> (gateway,1)
可以看出,数据进行的是流处理:一条一条处理,一条一条打印
3.批处理
将执行模式修改为 BATCH
@Test
public void batchTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
...
结果:
2> (java,2)
6> (springmvc,1)
3> (python,1)
8> (nacos,4)
1> (scripts,1)
4> (php,1)
8> (gateway,1)
8> (sentinel,1)
可以看出,数据进行的是批处理:数据处理完后,打印了统计结果,没有中间数据打印
4.自动处理
将执行模式修改为 AUTOMATIC
@Test
public void automicTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
...
结果跟批处理一样,因为数据源是从集合中获取数据,数据是有界的,自动按照批处理模式进行处理。
三、文件数据源
1.从文件中读取
file.txt 文件中的内容:
java,python,c++
java,python,c#
java,c++,php
@Test
public void fileTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.readTextFile("E:\\tmp\\flink\\file.txt")
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
Arrays.stream(value.split(",")).forEach(v -> out.collect(v));
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
.sum(1)
.print();
env.setParallelism(1);
env.execute("flink streaming hello word1");
}
结果:一行一行读取
5> (c#,1)
2> (java,3)
4> (php,1)
3> (c++,2)
3> (python,2)
2.从目录中读取
目录中有两个文件,
文件1内容:
java,python,c++
java,python,c#
java,c++,php
文件2内容:
java,python
java,python
java,python
@Test
public void dirTest() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.readTextFile("E:\\tmp\\flink")
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
Arrays.stream(value.split(",")).forEach(v -> out.collect(v));
}
})
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
})
.keyBy((KeySelector<Tuple2<String, Integer>, String>) value -> value.f0)
.sum(1)
.print();
env.setParallelism(1);
env.execute("flink streaming hello word1");
}
结果:
3> (c++,2)
3> (python,5)
5> (c#,1)
4> (php,1)
2> (java,6)
|