Spark运行时的基本工作流程如下:
- (1)任何spark应用程序都包含driver代码和executor代码。spark应用程序首先在driver初始化SparkContext。因为SparkContext是spark应用程序通往集群的唯一路径,在SparkContext中包含了DAGScheduler和TaskScheduler两个调度器类。在创建SparkContext对象的同时也自动创建了这两个类。
- (2)SparkContext初始化完成以后,首先根据Spark的相关配置,向Cluster Manager申请所需资源,然后在各个Worker节点初始化相应的Executor。Executor初始化完成以后,Driver将通过对Spark应用程序中的RDD进行解析,生成相应的RDD graph,该图描述了RDD之间的依赖关系。
- (3)RDD图构建完毕以后,Driver将提交给DAGScheduler进行解析。DAGScheduler在解析RDD图的过程中,当遇到Action算子的时候将进行逆向解析,根据RDD之间的依赖关系以及是否存在Shuffle等,将Job解析成一系列具有先后依赖关系的Stage。
- (4)DAGScheduler将划分的一系列的Stage按照先后顺序依次提交给底层的调度器TaskScheduler去执行。
- (5)TaskScheduler接收到来自DAGScheduler的stage以后,将会在集群环境中构建一个TaskSetManager实例来管理Stage(TaskSet)的生命周期。
- (6)TaskSetManager将会将相关的计算代码、数据资源文件等发送到相应的Executor上,并在相应的Executor上启动线程池执行。TaskSetManager在执行的过程之中,使用了一些优化的算法,用于提高执行的效率,譬如根据数据本地性决定每个Task的最佳位置、推测执行碰到Straggle任务需要放到别的结点上重试、出现Shuffle输出数据丢失时要报告fetch failed错误等机制。
- (7)在Task的执行过程中,可能有部分应用程序涉及到I/O的输入输出,在每个Executor由相应的BlockManager进行管理,相关BlockManager的信息将会与Driver中的Block tracker进行交互和同步。
- (8)在Task Threads执行的过程中,如果存在运行错误、或其他影响的问题导致失败,TaskSetManager将会默认尝试3次,尝试均失败以后将上报TaskScheduler,TaskScheduler如果解决不了,再上报DAGScheduler,DAGScheduler将根据各个Worker节点 的运行情况重新提交到别的Executor中执行。
- (9)Task Threads执行完成以后,将把执行的结果反馈给TaskSetManager,TaskSetManager反馈给TaskScheduler,TaskScheduler反馈给DAGScheduler,DAGScheduler将根据是否还存在待执行的Stage,将继续循环迭代提交给TaskScheduler去执行。
- (10)待所有的Stage都执行完成以后,将会最终达到应用程序的目标,或者输出到文件、或者在屏幕上显示等,Driver的本次运行过程结束,等待用户的其他指令或者关闭。
- (11)在用户显式关闭SparkContext后,整个运行过程结束,相应的资源被释放或回收。
从以上工作流可以看出,所有的Spark程序都离不开SparkContext和Executor两部分,每个Spark应用都有自己的Executor进程,此进程的生命周期和整个Application的生命周期相同,此进程内部维持着多个线程来并行地执行分配给他的Task。这种运行形式有利于不同的Application之间的资源调度隔离,但也意味着不同的Application之间难以做到相互通信和信息交换。同时需要注意由于Driver负责所有的任务调度,所以他应该尽可能地靠近worker结点。如果能在一个网络环境中那就更好了、
|