上一篇简单介绍了Flink的几个基本的概念,今天通过一个WordCount的案例来初步了解一下Flink是怎样做计算的。
这里我是用的是maven构建项目,在pom文件中添加Flink的依赖以及其他的依赖
<properties>
<flink.version>1.12.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
这里我使用的是版本是1.12,语言采用的是1.8 Java,也引入了Scala,是因为Flink底层的通信采用的Akka,需要用到Scala,也引入了日志的包,方便排查问题。
在resources下面创建log4j.propertieswen文件,将日志的配置写在文件里面
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
然后在某一个文件下新建一个文件,以空格的方式,输入几个单词,等会要用
然后创建对应的类,这里先用Flink的批处理来计算WordCount,具体的操作步骤如下:
- 获取执行环境
- 读取对应路径下的文件
- 将读取到的数据进行压平
- 将一个一个的单词转换成元组
- 分组,然后按照key进行分组
- 聚合计算
- 打印输出
具体的代码如下:
package com.bigdata_world;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class Flink01_WordCount_Batch {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> source = env.readTextFile("input/input.txt");
FlatMapOperator<String, String> wordDS = source.flatMap(new SourceFlatMap());
MapOperator<String, Tuple2<String, Integer>> mapDS = wordDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
});
UnsortedGrouping<Tuple2<String, Integer>> groupByDS = mapDS.groupBy(0);
AggregateOperator<Tuple2<String, Integer>> sum = groupByDS.sum(1);
sum.print();
}
public static class SourceFlatMap implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
}
}
控制台输出的结果如下:
(hive,1)
(flink,1)
(hello,2)
(slot,1)
(spark,1)
下面再进行流处理的WordCount,流处理又分为有界流和无界流,我们先写有界流的。
具体的代码思路跟批处理是一样的。这里不再赘述,直接上代码。
package com.bigdata_world;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Flink02_WordCount_Bounded {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.readTextFile("input/input.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = source.flatMap(new SourceTupleToFlatMap());
KeyedStream<Tuple2<String, Integer>, String> keyDS = tupleDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyDS.sum(1);
sum.print();
env.execute();
}
public static class SourceTupleToFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
输出结果如下:
1> (spark,1)
2> (hello,1)
4> (slot,1)
2> (hello,2)
1> (hive,1)
4> (flink,1)
根据上面的结果,从hello单词的累加结果可以看出,Flink确实是来一条计算一条,来一条累加一条。
那可能有的小伙伴要问了,每个结果前面的数字代表什么?
前面的数字跟电脑的CPU核心数有关,我这儿最大的数字是4,表示CPU的核心数是4,表示整个计算过程中它的并行度默认等于核心数。
那可不可以设置并行度呢?
答案当然是可以,可以通过设置env.setParallelism(1);参数,设置全局的并行度。结果如下:
(hello,1)
(spark,1)
(hello,2)
(hive,1)
(flink,1)
(slot,1)
使用无界流来计算WordCount,生产环境中基本上都是无界流。无界流我打算从端口读取数据,其他的代码都是一样的。
package com.bigdata_world;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Flink03_WordCount_Unbounded {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> source = env.socketTextStream("localhost",9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = source.flatMap(new SourceTupleToFlatMap());
KeyedStream<Tuple2<String, Integer>, String> keyDS = tupleDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = keyDS.sum(1);
sum.print();
env.execute();
}
public static class SourceTupleToFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
打开终端,输入 nc -lk 9999 输出的结果如下:
(hello,1)
(,1)
(hello,2)
(hive,1)
(hive,2)
(spark,1)
(kafka,1)
以上就是wordcount的集中写法,这里做一个简单的总结:
- 批处理使用的执行环境是ExecutionEnvironment,而流处理使用的执行环境是StreamExecutionEnvironment
- 批处理在进行分组的时候,使用的是groupBy,而在流处理中分组是没有groupBy,需要使用keyBy
- 并行度跟CPU的核心数有关,默认等于核心数,也可以设置全局唯一的并行度
- 批处理的时候,执行环境不需要启动任务,流处理需要在最后调用execute() 去启动任务
有兴趣的可以关注微信公众号:bigdata_world
|