Flink任务执行总体流程
本文描述一个Flink任务执行的总体流程的前半部分,执行环境的选择。
Flink任务的一个例子
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
Flink任务执行总体步骤
(1)创建一个StreamExecutionEnvironment对象env,也就是流任务的执行环境对象。
(2)调用数据接入函数来接入数据源的数据,流执行环境支持的数据源有:数值集合,文件,socket数据流。支持的模式有:一次性读取,和持续间隔的读取数据。通过数据接入函数,得到一个DataStreamSource对象,这是一个DataSream的子类,表示它是一个流数据的接入源。有了数据接入的源,就可以在该DataStream上执行各种转换(transformation)和处理操作了。
(3)在得到的数据源DataStream(实际上是一个DataStreamSource)对象上执行各种转换操作,比如:map,flatMap,ByKey等。这一步,在实现层面这些操作其实并没有执行,而是把这些操作添加到流执行环境env的转换操作数组中。
每次在调用transformation操作时,都会调用DataStream#doTransform函数,而在该函数中,会根据该转换操作来创建一个SingleOutputStreamOperator对象,并把该对象添加到流执行环境env的转换操作数组中,该数组是由变量:StreamExecutionEnvironment#transformations来定义的。
(4)当调用的转换操作完成时,就会调用流执行环境env的execute()函数来执行这些转换操作,从而得到最终的结果。
执行环境
Flink提供了两套执行环境,一套是批执行环境,其父类是:ExecutionEnvironment。一套是流执行,其父类是:StreamExecutionEnvironment。这里我们关注流执行环境。对于流执行环境,又可以分为本地环境和远端环境,各种类型的环境对应的实现类,如下表所示:
类别 | 本地环境 | 远端环境 |
---|
批执行环境 | LocalEnvironment | RemoteEnvironment | 流执行环境 | LocalStreamEnvironment | RemoteStreamEnvironment |
执行任务:execute
当应用程序调用execute(jobName)时,最终会调用父类的StreamExecutionEnvironment#execute来执行任务。该函数会继续调用StreamExecutionEnvironment#executeAsync函数。
由于最终的StreamGraph是要在执行器(需要实现PipelineExecutor接口)中执行,所以在executeAsync函数中,会根据配置信息来获取具体的PipelineExecutorFactory实现类对象,该对象是执行器(Executor)的工厂类,用来获取具体的执行器对象,通过执行器对象的execute(StreamGraph, Configuration)来执行StreamGraph构成的任务。
executeAsync函数
executeAsync函数实现流程如下:
(1)根据配置获取一个Pipeline执行器工厂对象:PipelineExecutorFactory
(2)获取执行器对象,并执行StreamGraph,返回一个CompletableFuture 对象:jobClientFuture。
(3)通过jobClientFuture对象得到JobClient对象:jobClient。
其代码概要如下:
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
...
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
...
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);
...
try {
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
...
}
}
获取Executor的工厂对象
根据配置来获取Executor的工厂对象。可能的工厂类有:
- LocalExecutorFactory:创建本地Executor工厂类,该工厂创建的executor是:LocalExecutor。
- RemoteExecutorFactory:创建远程Executor的工厂类。该工厂创建的Executor是:RemoteExecutor。当部署模式为standalone是,会创建此类Executor。
- KubernetesSessionClusterExecutorFactory:创建这样的executor,它们在一个存在的session集群上执行。该工厂创建的Executor是:KubernetesSessionClusterExecutor。
- YarnJobClusterExecutorFactory:创建在指定的cluster上执行job的工厂类。该工厂创建的Executor是:YarnJobClusterExecutor。
- YarnSessionClusterExecutorFactory:创建在yarnSession上执行的Executor的工厂类。该工厂创建的Executor是 :YarnSessionClusterExecutor。
Executor的工厂对象
不同的执行器工厂获得的执行器对象不同。使用哪种执行器工厂来创建执行器是由Flink任务的执行参数来决定的。Executor的工厂对象和执行器类的对应关系如下表:
执行器工厂 | 执行器 | 说明 |
---|
YarnSessionClusterExecutorFactory | YarnSessionClusterExecutor | Yarn session模式 | YarnJobClusterExecutorFactory | YarnJobClusterExecutor | Yarn job模式 | RemoteExecutorFactory | RemoteExecutor | standalone模式 | LocalExecutorFactory | LocalExecutor | 本地模式 | KubernetesSessionClusterExecutorFactory | KubernetesSessionClusterExecutor | k8s模式 | EmbeddedExecutorFactory | EmbeddedExecutor | 嵌入模式 | WebSubmissionExecutorFactory | EmbeddedExecutor | |
每个具体的执行器都有一个execute()函数,该函数是StreamGraph的执行者。execute函数的流程和每个执行器和执行环境相关,每个执行器的具体的执行过程,后面会继续分析。
小结
本文描述了Flink任务执行的总体流程,不同的执行环境其执行的流程不同,具体的执行流程在后面继续分析。
|