| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink 内核原理与实现-作业提交 -> 正文阅读 |
|
[大数据]Flink 内核原理与实现-作业提交 |
可以关注下公众号,专注数据开发、数据架构之路,热衷于分享技术干货。?一、提交流程? ? ? ? Flink作业在开发完毕之后,需要提交到Flink集群执行。ClientFronted是入口,触发用户开发的Flink应用Jar文件中的main方法,然后交给PipelineExecutor(流水线执行器,在FlinkClient 升成JobGraph之后,将作业提交给集群的重要环节。)#execue方法,最终会选择一个触发一个具体的PiplineExecutor执行。
提交模式又可分为:
1.1 Yarn Session提交流程启动集群:
? ? ? ? 如果提交到已经存在的集群,则获取Yarn集群信息、应用ID,并准备提交作业。如果启动新的Yarn Session集群,则进入步骤(2)
? ? ? ? 1)如果没有集群,则创建一个新的Session模式的集群。首先将应用配置(flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户Jar文件、JobGraph对象等)上传至分布式存储(如HDFS)的应用暂存目录。 ? ? ? ? 2)通过Yarn Client 向Yarn 提交Flink创建集群的申请,Yarn分配资源,在申请的Yarn Container中初始化并启动FlinkJobManager进程,在JobManager进程中运行YarnSessionClusterEntrypoint作为集群启动入口(不同的集群部署模式有不同的ClusterEntrypoint实现),初始化Dispatcher、ResourceManager,启动相关的RPC服务,等待Client通过Rest接口提交作业。 作业提交: Yarn 集群准备好后,开始作业提交。 1)Flink Client通过Rest向Dispatcher提交JobGraph。 2)Dispatcher是Rest接口,不负责实际的调度、执行方面的工作,当收到JobGraph后,为作业创建一个JobMaster,将工作交给JobManager(负责作业调度、管理作业和Task的生命周期),构建ExecutionGraph(JobGraph的并行化版本,调度层最核心的数据结构)。 这两个步骤结束后,作业进入调度执行阶段。 作业调度执行: 1)JobMaster向YarnResourceManager申请资源,开始调度ExecutionGraph执行,向YarnResourceManager申请资源;初次提交作业集群中尚没有TaskManager,此时资源不足,开始申请资源。 2)YarnResourceManager收到JobManager的资源请求,如果当前有空闲Slot则将Slot分配给JobMaster.,否则YarnResourceManager将向YarnMaster请求创建TaskManager。 3)YarnResourceManager将资源请求加入到等待请求队列,并通过心跳向Yarn RM 申请新的Container资源来启动TaskManager进程,Yarn分配新的Container给TaskManager。 4)YarnResourceManager启动,然后从HDFS加载Jar文件等所需要的的相关资源,在容器中启动TaskManager。 5)TaskManager启动之后,向ResourceManager注册,并把自己的Slot资源情况汇报给ResouceManager。 6)ResourceManager从等待队列中取出Slot请求,向TaskManager确认资源可用情况,并告知TaskManager将Slot分配给哪个JobMaster。 7)TaskManager向JobMaster提供Slot,JobMaster调度Task到TaskManager的此Slot上执行。 1.2 Yarn Per-Job提交流程启动集群:
作业提交: ? ? ? ? 该步骤与Seesion模式下的不同,Client并不会通过Rest向Dispatcher提交JobGraph,由Dispatcher从本地文件系统获取JObGraph,其后的不好走与Session模式的一样 作业调度执行: ? ? ? ? 与Yarn Session模式下一致。 1.3 K8s Session提交流程?启动集群:
两个步骤完成之后,Session模式的集群就创建成功,集群可以接收作业提交请求,但是此时还没有JobManager、TaskManager,当作业需要执行时,才会按需创建。 作业提交:
????????两个步骤完成之后,作业进入调度执行阶段。 作业调度执行: ? ? ? ? K8s Session模式集群下,ResourceManager向k8sMaster申请和释放TaskManager,除此之外,作业的调度与执行和Yarn模式是一样的。 ? ? ? ? 1)JobMaster向KubernetesResourceManager请求Slot。 ? ? ? ? 2)KubernetesResourceManager从kubernetes集群分配TaskManager。每个TaskManager都是具有唯一标识的Pod。KubernetesResourceManager会为TaskManager生成一份新的配置文件,里面有Flink Master的service name 作为地址。这样在FLInkMaster failover之后,TaskManager仍然可以重新连上。 ? ? ? ? 3)Kubernetes集群分配一个新的Pod后,在上面启动TaskManager。 ? ? ? ? 4)TaskManager启动后注册到SlotManager。 ? ? ? ? 5)SlotManager向TaskManager请求Slot. ? ? ? ? 6)TaskManager 提供Slot给JobMaster,然后任务就会被分配到这个Slot上运行。 二、Graph总览
2.1 流图使用DataStreamAPI 开发的应用程序,首先被转换为Transformation,然后被映射为StreamGraph。 2.1.1 SteramGraph核心对象
????????StreamNode是StremGraph中的节点 ,从Transformation转换而来,可以简单理解为一个StreamNode表示一个算子,从逻辑上来说,SteramNode在StreamGraph中存在实体和虚拟的StreamNode。StremNode可以有多个输入,也可以有多个输出。 ? ? ? ? 实体的StreamNode会最终变成物理算子。虚拟的StreamNode会附着在StreamEdge上。
? ? ? ? StreamEdge是StreamGraph中的边,用来连接两个StreamNode,一个StreamNode可以有多个出边、入边,StreamEdge中包含了旁路输出、分区器、字段筛选输出等信息。 2.1.2 StreamGraph生成过程? ? ? ? StreamGraph在FlinkClient中生成,由FlinkClient在提交的时候触发Flink应用的main方法,用户编写的业务逻辑组装成Transformation流水线,在最后调用StreamExecutionEnvironment.execute() 的时候开始触发StreamGraph构建。 StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出)?向前追溯到SourceTransformation。在遍历过程中一边遍历一边构建StreamGraph。 ?在遍历Transformation的过程中,会对不同类型的Transformation分别进行转换。对于物理Transformation则转换为StreamNode实体,对于虚拟Transformation则作为虚拟StreamNode。 ?针对于某一种类型的Transformation,会调用其相应的transformxxx()函数进行转换。transfromxxx()首先转换上游Transformation进行递归转换,确保上游的都已经完成了转换。然后通过addOperator()方法构造出StreamNode,通过addEdge()方法与上游的transform进行连接,构造出StreamEdge。 在添加StreamEdge的过程中,如果ShuffleMode为null,则使用ShuffleMode PIPELINED模式,在流计算中,只有PIPLINED模式才会在批处理中设计其他模式。构建StreamEdge的时候,在转换Transformation过程中生成的 虚拟StreamNode会将虚拟StreamNode的信息附着在StreamEdge上 2.1.3 虚拟Transformation 的转换?虚拟的Transformation生成的时候不会转换为SteramNode,而是添加为虚拟节点。
2.2 作业图?JobGraph可以由流计算的StreamGraph和批处理的OptimizedPlan转换而来。流计算中,在StreamGraph的基础上进行了一些优化,如果通过OperatorChain机制将算子合并起来,在执行时,调度在同一个Task线程上,避免数据的跨线程、跨网段的传递。 2.2.1 JobGraph核心对象
? ? ? ? 经过算子融合优化后符合条件的多个SteramNode可能会融合在一起生成一个JobVertex,即一个JobVertex包含一个或多个算子,JobVertex的输入是JobEdge,输出是IntermediateDataSet。
? ? ? ? JobEdge是JobGraph中连接IntermediateDataSet和JobVertex的边,表示JobGraph中的一个数据流转通道,其上游数据源是IntermediateDataSet,下游消费者是JobVertex。数据通过JobEdge 由IntermediateDataSet传递给JobVertex。
? ? ? ? 中间数据集IntermediateDataSet是一种逻辑结构,用来表示JobVertex的输出,即该JobVertex中包含的算子会产生的数据集。不同的执行模式下,其对应的结果分区类型不同,决定了在执行时刻数据交换的模式。 ????????IntermediateDataSet的个数与该JobVertex对应的StreamNode的出边数量相同,可以是一个或者多个。 2.2.2 JobGraph生成过程StreamingJobGraphGenerator负责流计算JobGraph的生成,在转换前需要进行一系列的预处理。
?预处理完毕后,开始构建JobGraph中的点和边,从Source向下遍历StreamGraph,逐步创建JObGraph,在创建的过程中同事完成算子融合(OperatorChain)优化。 ?执行具体的Chain和JobVertex生成、JobEdge的关联、IntermediateDataSet。从StreamGraph读取数据的StreamNode开始,递归遍历同时将StreamOperator连接在一起。 ?整理构建的逻辑如下(看上图!!!): 1)从Source开始,Source与下游的FlatMap不可连接,Source是起始节点,自己成为一个JobVertx。 2)此时开始一个新的连接分析,FlatMap是起始节点,与下游的KeyedAgg也不可以连接,那么FlatMap自己成为一个JobVertex。 3)此时开始一个新的连接分析。KeyedAgg是起始节点,并且与下游的Sink可以连接,那么递归地分析Sink节点,构造Sink与其下游是否可以连接,因为Slink没有下游,所以KeyedAgg和Sink节点连接在一起,共同构成了一个JobVertex。在这个JobVertex中,KeyedAgg是起始节点,index编号为0,sink节点index编号为1. ? ? ? ? 构建JobVertex的时候需要将StreamNode中的重要配置信息复制到JobVertex中。构建好JobVertex之后,需要构建JobEdge将JobVertex连接起来。KeyedAgg和Sink之间构成了一个算子连接,连接内部的算子之间无序构成JobEdge进行连接。 ? ? ? ? 在构建JobEdge的时候,很重要的一点是确定上游JobVertex和下游JobVertex的数据交换方式。此时根据ShuffleMode来确定ResultPartition类型,用FlinkPartition来确定JobVertex的连接方式。 ? ? ? ? Shuffle确定了ResultPartition,那么就可以确定上游JobVertex输出的IntermediateDataSet的类型了,也就知道JobEdge的输入IntermediateDataSet。 ? ? ? ? ForwardPartitioner和RescalePartitioner两种类型的Partitioner转换为DistributionPattern.POINTWISE 的分发模式。其他类型的Partitioner统一转换为DistributionPattern.ALL_TO_ALL模式。 JobGraph的构建和OperatorChain优化:
?2.2.3 算子融合一个Operatorchain在同一个Task线程内执行。OperatorChain内的算子之间,在同一个线程内通过方法调用的方式传递数据,能减少线程之间的切换,减少消息的序列化/反序列化,无序借助内存缓存区,也无须通过网络在算子间传递数据,可在减少延迟的同时提高整体吞吐量 operatorchain的条件: 1)下游节点的入度为1 2)SteramEdge的下游节点对应的算子不为null 3)StreamEdge的上游节点对应的算子不为null 4)StreamEdge的上下游节点拥有相同的slotSharingGroup,默认都是default. 5)下游算子的连接策略为ALWAYS. 6)上游算子的连接策略为ALWAYS 或者HEAD. 7)StreamEdge的分区类型为ForwardPartitioner 8)上下游节点的并行度一致 9)当前StreamGraph允许chain 2.3 执行图2.3.1 ExecutionGraph核心对象?
? ? ? ? 该对象和JobGraph中的JobVertex一一对应。该对象还包含了一组ExecutionVertex,数量与该JobVertex中所包含的SteramNode的并行度一致。 ? ? ? ? ExecutionJobVertex用来将一个JobVertex封装成一ExecutionJobVertex,并以此创建ExecutionVertex、Execution、IntermediateResult和IntermediateResultPartition,用于丰富ExecutionGraph。 ? ? ? ? 在ExecutionJobVertex的构造函数中,首先是依据对应的JobVertex的并发度,生成对应个数的ExecutionVertex。其中,一个ExecutionVertex代表一个ExecutionJobVertex的并发子Task。然后是将原来JobVertex的中间结果IntermediateDataSet转化为ExecutionGrap中IntermediateResult
? ? ? ? ExecutionJobVertex中会对作业进行并行化处理,构造可以并行执行的实例,每个并行执行的实例就是ExecutionVertex. ? ? ? ? 构建ExecutionVertex的同时,也回构建ExecutionVertex的输出IntermediateResult。并且将ExecutionEdge输出为IntermediatePartition。 ? ? ? ? ExecutionVertex的构造函数中,首先会创建IntermediatePartition,并通过IntermediateResult.setPartition()建立IntermediateResult和IntermediateResultPartition之间的关系,然后生成Execution,并配置资源相关。
????????IntermediateResult又叫做中间结果集,该对象是个逻辑概念,表示ExecutionJobVertex的输出,和JobGraph中的IntermediateDataSet一一对应,同样,一个ExecutionJobVertex可以有多个中间二级果,取决于当前JobVertex有几个出边。 ? ? ? ? 一个中间结果集包含多个中间结果分区IntermediateResultPartition,其个数等于该JobVertex的并发度。
????????IntermediateResultPartition又叫做中间结果分区,表示1个ExecutionVertex输出结果,与ExecutionEdge相关联。
? ? ? ? 表示ExecutionVertex的输入,连接到上游产生的IntermediateResultPartition。一个Execution对应于唯一的一个IntermediateResultPartition和一个ExecutionVertex。一个ExecutionVertex可以有多个ExecutionEdge。
? ? ? ? ExecutionVertex相当于每个Task的模板,在真正执行的时候,会将ExecutionVertex中的信息包装为一个Execution,执行一个ExecutionVertex的一次尝试。JobManager和TaskManager之间关于Task的部署和Task执行状态的更新都是通过ExecutionAttemptID来标识实例的。在故障或者数据需要重算的情况下,ExecutionVertex可能会有多个ExecutionAttemptID.一个Execution通过ExecutionAttemptID标识。 2.3.2 ExecutionGrap生成过程? ? ? ? 初始话作业调度器的时候,根据JobGraph生活ExecutionGraph。在SchedulerBase的构造方法中触发构建,最终调用SchedulerBase#createExecutionGraph 触发实际的构建动作,使用ExecutionGraphBuiler构建ExecutionGraph。 ?核心代码attachJobGraph: ?构建ExecutionEdge 的连接策略:
? ? ? ? 该策略用来连接当前ExecutionVertex与上游的IntermediataeResultParition。 ? ? ? ? 连接分三种情况 ? ? ? ? 1)一对一连接:并发的Task数量与分区数相等。 ? ? ? ? 2)多对一连接:下游的Task数量小于上游的分区数,此时分两种情况: ? ? ? ? ? ? ? ? a:下游Task可以分配同数量的结果分区IntermediataeResultParition。如上游有4个结果分区,下游有2个Task,那么每个Task会分配两个结果分区进行消费。 ? ? ? ? ? ? ? ? b:每个Task消费的上游分区结果数据不均,如上游有3个结果分区,下游有两个Task,那么一个Task分配2个结果分区消费,另一个分配一个结果分区消费。 ? ? ? ? 3)一对多连接:下游的Task数量多余上游的分区数,此时两种情况: ? ? ? ? ? ? ? ? a:每个结果分区的下游消费Task数据量相同,如上游有两个结果分区,下游有4个Task,每个结果分区被两个Task消费。 ? ? ? ? ? ? ? ? b:每个结果分区的下游消费Task数量不相同,如上游有两个结果分区,下游有3个Task,那么一个结果分区分配2个Task消费,另一个结果分区分配一个Task消费。
? ? ? ? 该策略下游的ExecutionVertex与上游的所有IntermediataeResultParition建立连接,消费其生产的数据。一般全连接的情况意味着数据在Shuffle。 接下来Flink资源管理篇,如果对Flink感兴趣或者正在使用的小伙伴,可以加我入群一起探讨学习。参考书籍《Flink 内核原理与实现》欢迎关注公众号: 数据基石? |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/18 13:58:39- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |