目录
一、Spark 里的几个概念
二、上下文对象 SparkContext 的核心属性
三、Spark RDD 的依赖关系
1.什么是依赖关系?什么是血缘关系?有什么作用?
2.RDD是如何保存依赖关系的
3.窄依赖&宽依赖及其作用
?四、Stage 划分及 Stage 调度
1.Stage 划分的基本步骤
2.Stage 调度 ??????
五、Task 划分及 Task 调度
1.Task 任务划分基本步骤
2.Task调度步骤
?前面学习了Spark作业提交到Yarn的执行流程: Spark内核(执行原理)之环境准备
????????现在学习一下Driver的工作流程。Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源;另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
????????当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝 试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册, 注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后, 将任务状态上报给Driver。
一、Spark 里的几个概念
一个 Spark 应用程序包括 SparkContext、Job、Stage 以及 Task 四个概念:
- Application:初始化一个 SparkContext 即生成一个 Application;
- Job 是以 Action 方法为界,遇到一个 Action 方法则触发一个 Job;
- Stage 是 Job 的子集,以 RDD 宽依赖(即 Shuffle)为界,遇到 Shuffle 做一次划分;
- Task 是 Stage 的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个 task。
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
Spark 的任务调度总体来说分两路进行,一路是 Stage 级的调度,一路是 Task 级的调度,总 体调度流程如下图所示:
二、上下文对象 SparkContext 的核心属性
??????? 其中,Spark RDD 通过其 Transactions 操作,形成了 RDD 血缘(依赖)关系图,即 DAG,最后通过 Action 的调用,触发 Job 并调度执行,执行过程中会创建两个调度器:DAGScheduler 和 TaskScheduler。
- DAGScheduler 负责 Stage 级的调度,主要是将 job 切分成若干 Stages,并将每个 Stage打包成 TaskSet 交给 TaskScheduler 调度。
- TaskScheduler 负责 Task 级的调度,将 DAGScheduler 给过来的 TaskSet 按照指定的调度策略分发到 Executor 上执行,调度过程中 SchedulerBackend 负责提供可用资源,其中SchedulerBackend 有多种实现,分别对接不同的资源管理系统。
????????Driver 初始化 SparkContext 过程中,会分别初始化 DAGScheduler、TaskScheduler、 SchedulerBackend 以及 HeartbeatReceiver,并启动 SchedulerBackend 以及 HeartbeatReceiver。 ????????SchedulerBackend 通过 ApplicationMaster 申请资源,并不断从 TaskScheduler 中拿到合适的Task 分发到 Executor 执行。
????????HeartbeatReceiver 负责接收 Executor 的心跳信息,监控 Executor的存活状况,并通知到 TaskScheduler。
三、Spark RDD 的依赖关系
??????? 在了解 Stage 划分之前,先了解下RDD的依赖关系。
1.什么是依赖关系?什么是血缘关系?有什么作用?
??????? 相邻的两个RDD的关系称为依赖关系,如 val rdd1 = rdd.map(_*2) 表示rdd1依赖于rdd,即新的RDD依赖于旧的RDD,多个连续的RDD的依赖关系,称之为血缘关系。
作用:
??????
看这个例子,有这样的依赖关系 :RDD1->RDD2->RDD3->RDD4,RDD4是通过读取数据源数据创建得到的,代码层面的逻辑为:
val RDD4 = sc.textFile("sssssssss")
val RDD3 = RDD4.flatMap()
val RDD2 = RDD3.map()
val RDD1 = RDD2.reduceByKey()
????????假设程序运行到val RDD1 = RDD2.reduceByKey()时失败了,那么这个Task将要面临重启。那问题来了,RDD1怎么知道我的上一步是什么呢?从头开始重新运行的话,头在哪儿呢?我们知道RDD是不会保存数据的,它只保存了数据结构和计算逻辑,如果遇到task失败,那不得整个作业重跑?
??????? 所以RDD为了提供容错性,需要将RDD之间的依赖关系保存下来,这样一旦出现错误,可以根据血缘关系将数据源重新读取进行计算。这也就是依赖关系的作用所在。
2.RDD是如何保存依赖关系的
先看一个示意图:
解读:
- RDD4依赖于文件datas/word.txt,它是通过textFile算子得到的,RDD4会保存这个依赖信息
- RDD3由RDD4经flatMap得到,自己依赖于RDD4,它会保存自己的依赖和RDD4的依赖
- RDD2由RDD3经map得到,自己依赖于RDD3,它会保存自己的依赖和RDD4、RDD3的依赖
- 依次类推
那么程序中如何查看依赖关系呢?
????????通过一个demo来测试一下,可以通过rdd.toDebugString来打印当前rdd的依赖信息:
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("TRXTest")
val sc = new SparkContext(sparkConf) //环境对象
val lines = sc.textFile("datas/word.txt")
// 打印血缘关系
println("RDD4的依赖关系:")
println(lines.toDebugString)
println("**********************")
val words = lines.flatMap(_.split(" "))
println("RDD3的依赖关系:")
println(words.toDebugString)
println("**********************")
val wordToOne = words.map(word => (word, 1))
println("RDD2的依赖关系:")
println(wordToOne.toDebugString)
println("**********************")
val wordToSum = wordToOne.reduceByKey(_ + _)
println("RDD1的依赖关系:")
println(wordToSum.toDebugString)
println("**********************")
val array = wordToSum.collect() // 收集
array.foreach(println)
sc.stop()
}
运行结果:
RDD4的依赖关系:
(1) datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD3的依赖关系:
(1) MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
| datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD2的依赖关系:
(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
| MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
| datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
RDD1的依赖关系:
(1) ShuffledRDD[4] at reduceByKey at Spark01_RDD_dep.scala:28 []
+-(1) MapPartitionsRDD[3] at map at Spark01_RDD_dep.scala:24 []
| MapPartitionsRDD[2] at flatMap at Spark01_RDD_dep.scala:20 []
| datas/word.txt MapPartitionsRDD[1] at textFile at Spark01_RDD_dep.scala:15 []
| datas/word.txt HadoopRDD[0] at textFile at Spark01_RDD_dep.scala:15 []
**********************
从运行结果上看,跟前面画的图展示的一致。
3.窄依赖&宽依赖及其作用
??????? 从上面的demo返回结果可以看到,在依赖关系中有MapPartitionsRDD和ShuffledRDD两种,这是什么意思呢?
MapPartitionsRDD类型的依赖有
val RDD4 = sc.textFile("sssssssss") val RDD3 = RDD4.flatMap() val RDD2 = RDD3.map()
????????可以发现,假设RDD只有一个分区,这几个RDD在转化中都没有Shuffle操作,他们都各自只依赖于一个RDD,我们称之为OneToOne依赖(窄依赖)。即窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用用,窄依赖我们形象的比喻为独生子女。
??????? 那么对应的val RDD1 = RDD2.reduceByKey() 由于产生了Shuffle操作,RDD1的依赖于上游RDD对个分区的数据,这个依赖,称之为Shuffle依赖(宽依赖)。即同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle,总结:宽依赖我们形象的比喻为多生。
??????? 那么窄依赖和宽依赖有什么作用呢?这就是我们接下来要学习的:Stage 划分。
?四、Stage 划分及 Stage 调度
????????Spark 的任务调度是从 DAG 切割开始,主要是由 DAGScheduler 来完成。当遇到一个Action 操作后就会触发一个 Job 的计算,并交给 DAGScheduler 来提交。
1.Stage 划分的基本步骤
- 先创建一个ResultStage
- 根据传入的RDD(最后一个,记做当前RDD),找它的依赖
- 如果遇到Shuffle依赖,就创建一个ShuffleMapStage
- 然后将当前RDD指向依赖的RDD
- 其实在每次创建前,都会检查当前RDD是的依赖是否是Shuffle依赖
- 如果是,就会往前创建stage
- 所以:spark中阶段的划分=shuffle依赖的数量+1
- 每个阶段之间是独立的
看一个简单的例子 WordCount 的Stage划分流程:
- Job 由 saveAsTextFile 触发,先创建一个ResultStage
- 从 RDD-3(记做当前RDD) 开始回溯搜索依赖关系,直到没有依赖的 RDD-0
- RDD- 3 依赖 RDD-2,并且是宽依赖(Shuffle依赖)
- 所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中
- RDD-2 依赖 RDD-1,RDD-1 依赖 RDD-0,这些依赖都是窄依赖
- 所以将 RDD-0、RDD-1 和 RDD-2 划分到同一个 Stage,即ShuffleMapStage中
2.Stage 调度 ??????
????????在程序执行过程中,一个Stage是否被提交,需要判断它的父 Stage 是否执行完毕,只有在父 Stage 执行完毕才能提交当前 Stage,如果一个 Stage 没有父 Stage,那么从该 Stage 开始提交。
????????Stage 提交时会将 Task 信息(分区信息以及方法等)序列化并被打包成 TaskSet 交给 TaskScheduler,一个Partition 对应一个 Task,另一方面 TaskScheduler 会监控 Stage 的运行状态,只有 Executor 丢失或者 Task 由于 Fetch 失败才需要重新提交失败的 Stage 以调度运行失败的任务,其他类型的 Task 失败会在 TaskScheduler 的调度过程中重试。
????????相对来说 DAGScheduler 做的事情较为简单,仅仅是在 Stage 层面上划分 DAG,提交 Stage 并监控相关状态信息。TaskScheduler 则相对较为复杂。
五、Task 划分及 Task 调度
1.Task 任务划分基本步骤
- stage划分好后,开始提交stage,从ResultStage开始
- 提交之前会判断当前stage前面有没有stage,
- 如果有,先提交前面的stage,交给TaskScheduler
- 如果没有,开始创建task
- 每个stage总共有多少个task取决于一个stage中,最后一个RDD的分区数量
- 所以总共的任务数量就是每个stage最后一个RDD的分区数量之和
2.Task调度步骤
如何将划分好的Task发送到Executor中执行呢?
?
- 将划分好的task 组成TaskSet
- 再将TaskSet封装成TaskSetManage
- 构造调度器(默认FIFO调度器,yarn的RM也有调度器)
- 调度器里有个属性叫rootPool,可以理解为任务池
- 构造器将TaskSetManage放到rootPool中
- 后面遍历rootPool调度任务
- Task中包含计算逻辑和数据(应该是数据的存放位置)
- Task中有个属性:本地化级别
- 计算和数据的位置(计算:task,此时在driver上,数据在Executor上)存在不同的级别,这个级别称之为本地化级别
- 这个级别可以解决的问题:有一个driver,一个task,两个Executor:A,B
- 如果一个Task需要的数据在A上,当任务发到A时,效率最高,否则B需要从A拷贝数据
- 那么如何决定Task发到哪儿呢?--task的本地化级别
- 如何确定Task的本地化级别呢?
- 通过调用 getPreferrdeLocations()得到 partition 的优先位置,
- 由于一个partition对应一个Task,此par的优先位置就是task的优先位置
- 根据每个Task的优先位置,确定Task的本地化级别
- 本地化级别(由高到低):
- 进程本地化:数据和计算在同一个Executor进程中。
- 节点本地化:数据和计算在同一个节点中,task和数据不再同一个Executor进程中,数据需要在进程间进行传输。
- 机架本地化:数据和计算在同一个机架的两个节点上,数据需要通过网络在节点间进行传输。
- 无首选:对于task来说,从哪获取数据都一样,没有好坏之分。
- 任意:task和数据可以在集群的任何地方,且不在一个机架中,性能最差。
- 经过遍历筛选后最终拿到task,开始启动任务(def launchTask)
- 找到task需要发送的executorEndpoint(终端),给它发消息
- 发什么?--launchTask()
- 将任务序列化之后发送过去
????????TaskSetManager 负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。
|