一、Flink相关API说明
Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大
注意:我自己使用的是flink 1.7.2版本,但是在Flink1.12时支持流批一体,DataSetAPI已经不推荐使用了,大家最好优先使用DataStream流式API,既支持无界数据处理/流处理,也支持有界数据处理/批处理!
二、Flink运行架构
1、flink的编程模型
2、创建工程,maven依赖准备
????<properties>
????????<encoding>UTF-8</encoding>
????????<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
????????<maven.compiler.source>1.8</maven.compiler.source>
????????<maven.compiler.target>1.8</maven.compiler.target>
????????<java.version>1.8</java.version>
????????<!--?Flink的架构中使用了?Akka?来实现底层的分布式通信,而?Akka?是用?Scala?开发的。我们用到的?Scala?版本为?2.12。-->
????????<scala.version>2.12</scala.version>
????????<flink.version>1.12.0</flink.version>
????</properties>
?<sourceDirectory>src/main/java</sourceDirectory>
????????<plugins>
????????????<!--?编译插件?-->
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-compiler-plugin</artifactId>
????????????????<version>3.5.1</version>
????????????????<configuration>
????????????????????<source>1.8</source>
????????????????????<target>1.8</target>
????????????????????<!--<encoding>${project.build.sourceEncoding}</encoding>-->
????????????????</configuration>
????????????</plugin>
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-surefire-plugin</artifactId>
????????????????<version>2.18.1</version>
????????????????<configuration>
????????????????????<useFile>false</useFile>
????????????????????<disableXmlReport>true</disableXmlReport>
????????????????????<includes>
????????????????????????<include>**/*Test.*</include>
????????????????????????<include>**/*Suite.*</include>
????????????????????</includes>
????????????????</configuration>
????????????</plugin>
????????????<!--?打包插件(会包含所有依赖)?-->
????????????<plugin>
????????????????<groupId>org.apache.maven.plugins</groupId>
????????????????<artifactId>maven-shade-plugin</artifactId>
????????????????<version>2.3</version>
????????????????<executions>
????????????????????<execution>
????????????????????????<phase>package</phase>
????????????????????????<goals>
????????????????????????????<goal>shade</goal>
????????????????????????</goals>
????????????????????????<configuration>
????????????????????????????<filters>
????????????????????????????????<filter>
????????????????????????????????????<artifact>*:*</artifact>
????????????????????????????????????<excludes>
????????????????????????????????????????<!--
????????????????????????????????????????zip?-d?learn_spark.jar?META-INF/*.RSA?META-INF/*.DSA?META-INF/*.SF?-->
????????????????????????????????????????<exclude>META-INF/*.SF</exclude>
????????????????????????????????????????<exclude>META-INF/*.DSA</exclude>
????????????????????????????????????????<exclude>META-INF/*.RSA</exclude>
????????????????????????????????????</excludes>
????????????????????????????????</filter>
????????????????????????????</filters>
????????????????????????????<transformers>
????????????????????????????????<transformer?implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
????????????????????????????????????<!--?设置jar包的入口类(可选)?-->
?????????????????????????????????????<mainClass>com.tjcu.TestDataStreamYarn</mainClass>
????????????????????????????????</transformer>
????????????????????????????</transformers>
????????????????????????</configuration>
????????????????????</execution>
????????????????</executions>
????????????</plugin>
????????</plugins>
3、需求
4、编码步骤
1.准备环境-env
2.准备数据-source
3.处理数据-transformation
4.输出结果-sink
5.触发执行-execute
getExecutionEnvironment()?//推荐使用
createLocalEnvironment()
createRemoteEnvironment(String?host,?int?port,?String...?jarFiles)
5、代码实现
(1)DataSet(了解)
package?com.tjcu;
import?org.apache.flink.api.common.functions.FlatMapFunction;
import?org.apache.flink.api.common.functions.MapFunction;
import?org.apache.flink.api.java.DataSet;
import?org.apache.flink.api.java.ExecutionEnvironment;
import?org.apache.flink.api.java.operators.AggregateOperator;
import?org.apache.flink.api.java.operators.DataSource;
import?org.apache.flink.api.java.operators.FlatMapOperator;
import?org.apache.flink.api.java.operators.UnsortedGrouping;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.util.Collector;
/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?15:37
?*?@description:使用Flink完成WordCount-DataSet
?*?编码步骤
?*??1.准备环境-env
?*??2.准备数据-source
?*??3.处理数据-transformation
?*??4.输出结果-sink
?*??5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
?*/
public?class?TestDataSet?{
????public?static?void?main(String[]?args)?throws?Exception?{
?????//老版本的API如下,目前不推荐使用了
????????//1.准备环境?env
????????ExecutionEnvironment?executionEnvironment?=?ExecutionEnvironment.getExecutionEnvironment();
????????//2.准备数据-source
????????DataSource<String>?stringDataSource?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?hadoop?spark",?"whj?hadoop?spark");
????????//3.处理数据-transformation
????????//3.1每一行数据按照空格切分成一个个的单词组成一个集合
????????FlatMapOperator<String,?String>?wordsDS?=?stringDataSource.flatMap(new?FlatMapFunction<String,?String>()?{
????????????@Override
????????????public?void?flatMap(String?value,?Collector<String>?out)?throws?Exception?{
????????????????//value就是一行行的数据
????????????????String[]?words?=?value.split("?");
????????????????for?(String?word?:?words)?{
????????????????????out.collect(word);//将切割处理的一个个的单词收集起来并返回
????????????????}
????????????}
????????});
????????DataSet<Tuple2<String,?Integer>>?wordAndOne?=?wordsDS.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????@Override
????????????public?Tuple2<String,?Integer>?map(String?value)?throws?Exception?{
????????????????//value就是每一个单词
????????????????return?Tuple2.of(value,?1);
????????????}
????????});
????????//分组
????????UnsortedGrouping<Tuple2<String,?Integer>>?grouped?=?wordAndOne.groupBy(0);
????????//聚合
????????AggregateOperator<Tuple2<String,?Integer>>?result?=?grouped.sum(1);
????????//输出结果-sink
????????result.print();
????}
}
(2)基于DataStream(匿名内部类-处理流)
package?com.tjcu;
import?org.apache.flink.api.common.RuntimeExecutionMode;
import?org.apache.flink.api.common.functions.FlatMapFunction;
import?org.apache.flink.api.common.functions.MapFunction;
import?org.apache.flink.api.java.ExecutionEnvironment;
import?org.apache.flink.api.java.operators.DataSource;
import?org.apache.flink.api.java.operators.FlatMapOperator;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.datastream.KeyedStream;
import?org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.util.Collector;;
/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?16:07
?*?@description:需求:使用Flink完成WordCount-DataStream?批处理
?*??*?编码步骤
?*??*?1.准备环境-env
?*??*?2.准备数据-source
?*??*?3.处理数据-transformation
?*??*?4.输出结果-sink
?*??*?5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
?*/
public?class?TestDataStream?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//新版本的流批统一API,既支持流处理也支持批处理
????????//1.准备环境?env
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
//?????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
????????//executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
????????//2.准备数据-source
????????DataStreamSource<String>?stringDataStreamSource?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?spark",?"whj");
????????//3.处理数据-transformation
????????//3.1每一行数据按照空格切分成一个个的单词组成一个集合
????????DataStream<String>?words?=?stringDataStreamSource.flatMap(new?FlatMapFunction<String,?String>()?{
????????????@Override
????????????public?void?flatMap(String?value,?Collector<String>?out)?throws?Exception?{
????????????????//value就是每一行数据
????????????????String[]?arr?=?value.split("?");
????????????????for?(String?word?:?arr)?{
????????????????????out.collect(word);
????????????????}
????????????}
????????});
????????DataStream<Tuple2<String,?Integer>>?wordAndOne?=?words.map(new?MapFunction<String,?Tuple2<String,?Integer>>()?{
????????????@Override
????????????public?Tuple2<String,?Integer>?map(String?value)?throws?Exception?{
????????????????//value就是一个个单词
????????????????return?Tuple2.of(value,?1);
????????????}
????????});
????????//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
????????KeyedStream<Tuple2<String,?Integer>,?String>?grouped?=?wordAndOne.keyBy(t?->?t.f0);
????????//聚合
????????SingleOutputStreamOperator<Tuple2<String,?Integer>>?result?=?grouped.sum(1);
????????//TODO?3.sink
????????result.print();
????????//启动并等待程序结束
????????executionEnvironment.execute();
????}
}
env.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
(3)代码实现-DataStream-Lambda
package?com.tjcu;
import?org.apache.flink.api.common.RuntimeExecutionMode;
import?org.apache.flink.api.common.typeinfo.Types;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.datastream.DataStreamSource;
import?org.apache.flink.streaming.api.datastream.KeyedStream;
import?org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.util.Collector;;import?java.util.Arrays;
/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?16:07
?*?@description:需求:使用Flink完成WordCount-DataStream?批处理
?*??*?编码步骤
?*??*?1.准备环境-env
?*??*?2.准备数据-source
?*??*?3.处理数据-transformation
?*??*?4.输出结果-sink
?*??*?5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
?*/
public?class?TestDataStream?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//新版本的流批统一API,既支持流处理也支持批处理
????????//1.准备环境?env
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
//?????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);//注意:使用DataStream实现批处理
????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);//注意:使用DataStream实现流处理
????????//executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//注意:使用DataStream根据数据源自动选择使用流还是批
????????//2.准备数据-source
????????DataStreamSource<String>?stringDataStreamSource?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?spark",?"whj");
????????//3.处理数据-transformation
????????//3.1每一行数据按照空格切分成一个个的单词组成一个集合
????????SingleOutputStreamOperator<String>?words?=?stringDataStreamSource.flatMap(
????????????????(String?value,?Collector<String>?out)?->?Arrays.stream(value.split("?")).forEach(out::collect)
????????).returns(Types.STRING);
????????DataStream<Tuple2<String,?Integer>>?wordAndOne?=?words.map(
????????????????(String?value)?->?Tuple2.of(value,?1)
????????).returns(Types.TUPLE(Types.STRING,Types.INT));
????????//分组:注意DataSet中分组是groupBy,DataStream分组是keyBy
????????KeyedStream<Tuple2<String,?Integer>,?String>?grouped?=?wordAndOne.keyBy(t?->?t.f0);
????????//聚合
????????SingleOutputStreamOperator<Tuple2<String,?Integer>>?result?=?grouped.sum(1);
????????//TODO?3.sink
????????result.print();
????????//启动并等待程序结束
????????executionEnvironment.execute();
????}
}
(4)On-Yarn-掌握
hadoop?fs?-chmod?-R?777??/
并在代码中添加:
System.setProperty("HADOOP_USER_NAME",?"root")
package?com.tjcu;
import?org.apache.flink.api.common.RuntimeExecutionMode;
import?org.apache.flink.api.common.typeinfo.Types;
import?org.apache.flink.api.java.functions.KeySelector;
import?org.apache.flink.api.java.tuple.Tuple2;
import?org.apache.flink.api.java.utils.ParameterTool;
import?org.apache.flink.streaming.api.datastream.DataStream;
import?org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import?org.apache.flink.util.Collector;
import?java.util.Arrays;
/**
?*?@author?:王恒杰
?*?@date?:Created?in?2022/4/20?16:28
?*?@description:使用Flink完成WordCount-DataStream--使用lambda表达式--修改代码使适合在Yarn上运行?编码步骤
?*?*?1.准备环境-env
?*?*?2.准备数据-source
?*?*?3.处理数据-transformation
?*?*?4.输出结果-sink
?*?*?5.触发执行-execute//批处理不需要调用!流处理需要
?*/
public?class?TestDataStreamYarn?{
????public?static?void?main(String[]?args)?throws?Exception?{
????????//获取参数
????????ParameterTool?params?=?ParameterTool.fromArgs(args);
????????String?output?=?null;
????????if?(params.has("output"))?{
????????????output?=?params.get("output");
????????}?else?{
????????????output?=?"hdfs://node1:8020/wordcount/output_"?+?System.currentTimeMillis();
????????}
????????//1.准备环境-env
????????StreamExecutionEnvironment?executionEnvironment?=?StreamExecutionEnvironment.getExecutionEnvironment();
????????executionEnvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
????????//2.准备数据源-source
????????DataStream<String>?linesDS?=?executionEnvironment.fromElements("whj?hadoop?spark",?"whj?hadoop?spark",?"whj?hadoop",?"whj");
????????//3.处理数据-transformation
????????DataStream<Tuple2<String,?Integer>>?result?=?linesDS
????????????????.flatMap(
????????????????????????(String?value,?Collector<String>?out)?->?Arrays.stream(value.split("?")).forEach(out::collect)
????????????????).returns(Types.STRING)
????????????????.map(
????????????????????????(String?value)?->?Tuple2.of(value,?1)
????????????????).returns(Types.TUPLE(Types.STRING,?Types.INT))
????????????????//.keyBy(0);
????????????????.keyBy((KeySelector<Tuple2<String,?Integer>,?String>)?t?->?t.f0)
????????????????.sum(1);
????????//4.输出结果-sink
????????result.print();
????????//如果执行报hdfs权限相关错误,可以执行?hadoop?fs?-chmod?-R?777??/
????????System.setProperty("HADOOP_USER_NAME",?"root");//设置用户名
????????//result.writeAsText("hdfs://node1:8020/wordcount/output_"+System.currentTimeMillis()).setParallelism(1);
????????result.writeAsText(output).setParallelism(1);
????????//5.触发执行-execute
????????executionEnvironment.execute();
????}
}
/usr/apps/flink/bin/flink?run?-Dexecution.runtime-mode=BATCH?-m?yarn-cluster?-yjm?1024?-ytm?1024?
?/usr/apps/words.jar?--output?hdfs://node1:8020/wordcount/output_xx
http://node1:8088/cluster或者http://node1:50070/explorer.html#/
三、Flink原理
1、Flink角色分工
#1.?JobManager
???他扮演的是集群管理者的角色,负责调度任务,协调checkpoints、协调故障恢复、
???收集?Job?的状态信息,并管理?Flink?集群中的从节点?TaskManager。
#2.TaskManager
???实际负责执行计算的?Worker,在其上执行?Flink?Job?的一组?Task;
???TaskManager?还是所在节点的管理员,
???它负责把该节点上的服务器信息比如内存、磁盘、任务运行情况等向?JobManager?汇报。
#3.Client:
用户在提交编写好的?Flink?工程时,会先创建一个客户端再进行提交,这个客户端就是?Client
2、Flink执行流程
用户首先提交Flink程序到JobClient,经过JobClient的处理、解析、优化提交到JobManager,最后由TaskManager运行task。
(1)Standalone版本
(2)On Yarn版本实现原理
1.?Client向HDFS上传Flink的Jar包和配置
2.?Client向Yarn?ResourceManager提交任务并申请资源
3.?ResourceManager分配Container资源并启动ApplicationMaster,
??然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
??
4.?ApplicationMaster向ResourceManager申请工作资源,
??NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
??
5.?TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
3、Flink Streaming Dataflow
(1)Dataflow、Operator、Partition、SubTask、Parallelism
1.?Dataflow:Flink程序在执行的时候会被映射成一个数据流模型
2.?Operator:数据流模型中的每一个操作被称作Operator,Operator分为:Source/Transform/Sink
??????????准备数据-Source
??????????处理数据-Transformation
??????????输出结果-Sink
3.?Partition:数据流模型是分布式的和并行的,执行中会形成1~n个分区
4.?Subtask:多个分区任务可以并行,每一个都是独立运行在一个线程中的,也就是一个Subtask子任务
5.?Parallelism:并行度,就是可以同时真正执行的子任务数/分区数
(2) Operator传递模式
数据在两个operator(算子)之间传递的时候有两种模式:
两个operator用此模式传递的时候,会保持数据的分区数和数据的排序;如上图中的Source1到Map1,它就保留的Source的分区特性,以及分区元素处理的有序性。--类似于Spark中的窄依赖
这种模式会改变数据的分区数;每个一个operator subtask会根据选择transformation把数据发送到不同的目标subtasks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区。--类似于Spark中的宽依赖
(3)Operator Chain(算子链)
客户端在提交任务的时候会对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后的Operator称为Operator chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行--就是SubTask。
(4)TaskSlot(任务槽) And Slot Sharing(槽共享)
每个TaskManager是一个JVM的进程, 为了控制一个TaskManager(worker)能接收多少个task,Flink通过Task Slot来进行控制。TaskSlot数量是用来限制一个TaskManager工作进程中可以同时运行多少个工作线程,TaskSlot 是一个 TaskManager 中的最小资源分配单位,一个 TaskManager 中有多少个 TaskSlot 就意味着能支持多少并发的Task处理。
Flink将进程的内存进行了划分到多个slot中,内存被划分到不同的slot之后可以获得如下好处:
1.??TaskManager最多能同时并发执行的子任务数是可以通过TaskSolt数量来控制的
2.??TaskSolt有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响。
Flink允许子任务共享插槽,即使它们是不同任务(阶段)的子任务(subTask),只要它们来自同一个作业。比如图左下角中的map和keyBy和sink 在一个 TaskSlot 里执行以达到资源共享的目的。
允许插槽共享有两个主要好处:
1.?资源分配更加公平,如果有比较空闲的slot可以将更多的任务分配给它。
2.?有了任务槽共享,可以提高资源的利用率。
slot是静态的概念,是指taskmanager具有的并发执行能力
parallelism(平行)是动态的概念,是指程序运行时实际使用的并发能力
(5)运行时组件
Flink运行时架构主要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:
-
作业资源管理器(JobManager): 分配任务、调度checkpoint做快照 -
任务管理器(TaskManager):主要干活的 -
资源管理器(ResourceManager):管理分配资源 -
分发器(Dispatcher):方便递交任务的接口,WebUI
因为Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机上。每个组件的职责如下:
1.?控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager?所控制执行。
2.?JobManager?会先接收到要执行的应用程序,这个应用程序会包括:
???作业图(JobGraph)、逻辑数据流图(logical?dataflow?graph)和打包了所有的类、库和其它资源的JAR包。
3.?JobManager?会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),
????包含了所有可以并发执行的任务。
4.?JobManager?会向资源管理器(ResourceManager)请求执行任务必要的资源,
??也就是任务管理器(TaskManager)上的插槽(slot)。
??一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。
??而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
1.?Flink中的工作进程。通常在Flink中会有多个TaskManager运行,
???每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
2.?启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,
???TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
3.?在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
1.?主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger?插槽是Flink中定义的处理资源单元。
2.?Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
3.?当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。
??如果ResourceManager没有足够的插槽来满足JobManager的请求,
??它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
1.?可以跨作业运行,它为应用提交提供了REST接口。
2.?当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
3.?Dispatcher也会启动一个Web?UI,用来方便地展示和监控作业执行的信息。
4.?Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
(6)Flink执行图(ExecutionGraph)
由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,因为它们表示的是计算逻辑的高级视图。为了执行一个流处理程序,Flink需要将逻辑流图转换为物理数据流图(也叫执行图),详细说明程序的执行方式。
Flink 中的执行图可以分成四层:StreamGraph(逻辑流图) -> JobGraph -> ExecutionGraph -> 物理执行图。
1.?StreamGraph:是根据用户通过?Stream?API?编写的代码生成的最初的图。用来表示程序的拓扑?[tuò?pū]?结构。
2.?JobGraph:StreamGraph经过优化后生成了?JobGraph,提交给?JobManager?的数据结构。
?????????????主要的优化为,将多个符合条件的节点?chain?在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
3.?ExecutionGraph:JobManager?根据?JobGraph?生成ExecutionGraph。方便调度和监控和跟踪各个?tasks?的状态。
????????????????????ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
4.?物理执行图:JobManager?根据?ExecutionGraph?对?Job?进行调度后,
?????????????在各个TaskManager?上部署?Task?后形成的“图”,并不是一个具体的数据结构。
1.?StreamGraph:最初的程序执行逻辑流程,也就是算子之间的前后顺序--在Client上生成
2.?JobGraph:将OneToOne的Operator合并为OperatorChain--在Client上生成
3.?ExecutionGraph:将JobGraph根据代码中设置的并行度和请求的资源进行并行化规划!--在JobManager上生成
4.?物理执行图:将ExecutionGraph的并行计划,落实到具体的TaskManager上,将具体的SubTask落实到具体的TaskSlot内进行运行。
|