Spark源码之Stage的划分
一段Spark代码提交之后,遇到action算子会被划分为一个job,job提交到DAGSchedule之后,怎么根据有向无环图划分Stage呢?
一、Stage逻辑
先看一下Stage的规律
explain
select * from t1
hive (test)> explain select * from t1;
OK
Explain
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: t1
Select Operator
expressions: name (type: string), age (type: int)
outputColumnNames: _col0, _col1
ListSink
可以看到只有一个阶段,Stage-0
如果是join操作呢?
explain
select * from t1
left join t2
on t1.name = t2.name;
Explain
STAGE DEPENDENCIES:
Stage-2 is a root stage
Stage-1 depends on stages: Stage-2
Stage-0 depends on stages: Stage-1
可以看到有三个阶段:Stage-0、Stage-1、Stage-2
如果是分组求和操作呢?
explain
select name,sum(age) ages
from t1
group by name
Explain
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
可以看到有两个阶段:Stage-0、Stage-1
explain
select name, concat_ws(',', collect_set(cast(age as string)))
from t1
group by name;
去重操作?
explain
select distinct * from t1;
Explain
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
可以看到有两个阶段:Stage-0、Stage-1
二、Stage划分原理
这里可以看出来,其实Spark中Stage的划分,好像和一些join、group by、distinct等这些有关系,那么Spark在划分Stage的时候是以什么为区分呢?
其实Spark的底层是根据RDD的血缘依赖来划分Stage的。
我们知道,Spark中当触发一个action算子时候,就会形成一个job,这个job就交给DAGSchedule,根据有向无环图来划分Stage。
具体划分策略是,由最终的 RDD 不断通过 依赖回溯判断父依赖是否是宽依赖,即以 Shuffle 为界,划分 Stage,窄依赖的 RDD 之 间被划分到同一个 Stage 中,可以进行 pipeline 式的计算。划分的 Stages 分两类,一类 叫做 ResultStage,为 DAG 最下游的 Stage,由 Action 方法决定,另一类叫做 ShuffleMapStage,为下游 Stage 准备数据
三、要点总结
总而言之,Stage的划分是根据RDD的宽依赖为界的,即是根据Shuffle的数量,Stage的数量就是Shuflle的数量+1。可能产生shuffle的算子:repartition类、xxxByKey类、join类
|