1.分布式计算引擎的发展
- (1) 第一代:MapReduce
- 应用:传统的离线计算
- 缺点:基于磁盘存储,读写性能差,灵活性差
- (2) 第二代:Tez、Storm
- 应用:离线计算
- 缺点:Tez基于MR做了颗粒度拆分,提高了灵活性,但还是基于磁盘,读写性能没有改善
- Storm应用于实时计算
- (3) 第三代:Spark
- 离线+实时:lambda架构
- 目前在工作中主要使用的离线计算引擎
- (4) 第四代:Flink
- 所有计算全部都是通过实时来实现的:Kappa架构
- 目前在工作中主要使用实时计算引擎
2.Spark简介
- (1) 定义:Spark是一个高速的、统一化的分布式分析计算引擎和机器学习算法库
- (2) 功能:
- ① 实现离线数据批处理:SparkCore
- ② 实现交互式数据分析:SparkSQL
- ③ 实现实时数据处理:SparkStreaming / StructStreaming
- ④ 实现机器学习的开发:Spark ML lib
- ⑤ 图计算:SparkGraphx
- (3) 特点:
- ① 快:所有处理和计算积极使用内存来实现
- ② 通用性强:功能全面,适用于大多数大数据分析计算场景
- ③ 易于使用:接口丰富(Java/Python/SQL/DSL/R/Scala)
- ④ 随处运行:数据源接口丰富,可运行在各种资源的平台上
- (4) 名词介绍
- ①SparkCore:基于代码的离线批处理计算,类似于MR
- ②SparkSQL
- 1)离线:基于SQL实现离线批处理计算,类似于Hive
- 2)实时:StructStreaming,使用SQL实现实时数据计算
- ③Streaming:基于SparkCore实现实时数据计算
- ④MLlib:机器学习算法库
- ⑤Graphx:图计算,数据结构中的图的计算
- (5) 使用场景
- ①利用SparkCore和SparkSQL进行离线分析计算
- ②利用SparkStreming和StructStreming实时分析计算(目前工作中实时主要用Flink代替Spark)
3.Spark与MapReduce对比

- 问题:Spark为什么比MR快?
- ①Spark积极使用内存式计算(RDD);
- ②Spark使用有向无环图(DAG)执行计划,灵活性高;
- ③Spark中的Task是线程级别:节省进程的开销,只要启动一次,直到程序结束
4.Spark的基本组成
- (1) 基本机构组成
 分布式主从架构:
- ①主节点:master,管理节点
- ②从节点:worker,计算节点
- 1)分配资源给Execuor
- 2)运行Executor进程,实现Task的运行
- (2) 程序组成:任何一个程序都由两种进程构成:Driver和Executor
- ① Driver进程(初始化进程或者Task管理进程)
- 1)向主节点申请Executor资源,让主节点在从节点上根据需求配置启动对应的Executor
- 2)解析代码逻辑:将代码中的逻辑转换为Task
- 如果遇到RDD中的数据的使用:构建一个Job,触发Task运行
- 3)将Task分配给Executor去运行
- 4)监控每个Executor运行的Task状态
- ② Executor进程(执行进程)
- 1)运行在Worker上,使用Worker分配的资源等待运行Task
- 2)所有Executor启动成功以后会向Driver进行注册
- 3)Executor收到分配Task任务,运行Task
- (3) 各组成部分之间的关系:
- ①Master负责管理所有Worker资源
- ②Driver会向Master请求启动Executor
- ③Worker上负责运行Executor进程
- ④Driver负责解析、分配、监控所有Task线程在Executor中运行
5.Spark中的task是怎么得到的?
- step1:先读取数据:将数据变成一个RDD【分布式的集合】对象
- step2:转换处理数据:将RDD不断调用转换函数进行处理,得到新的RDD
- step3:将结果进行保存
- ① Job:触发job,当用到RDD中的数据的时候,根据RDD的来源转换关系,构建DAG
- ② Stage:根据Job中使用RDD的依赖关系,通过回溯算法将一个job中的逻辑关系,构建Stage,得到DAG图
- 1)Stage的划分:按照是否产生shuffle,如果产生shuffle,就划分一个新的stage
- 2)执行每个Stage:从Stage编号最小的开始进行执行
- 将每个Stage转换为一个Task集合:Task集合中Task的个数由RDD的分区数据决定
- 问题:为什么要根据shuffle划分Stage?
答: 同一个Stage中的所有Task可以在内存中并行运行 - ③ Task:每个Stage会对应一定个数的Task,Task个数由Stage中RDD的分区个数决定
6.Sparkcore开发
import org.apache.spark.{SparkConf, SparkContext}
object SparkCoreSimpleMode {
def main(args: Array[String]): Unit = {
val sc:SparkContext = {
val conf = new SparkConf()
.setMaster("")
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("key","value")
SparkContext.getOrCreate(conf)
}
sc.setLogLevel("WARN")
sc.stop()
}
}
- (2)Wordcount案例
经典案例Wordcount——Spark版(Sparkcore/SparkDSL/SparkSQL)https://blog.csdn.net/m0_56919489/article/details/118823291 - (3) jar包提交运行
注:在IDEA上开发代码只是为了测试代码逻辑,正式生产和工作中程序还是要打包放到集群上进行测试和使用的
- ① 基本参数选项:
- – master:指定spark程序运行环境
- – local:本地
- – spark://node1:7077:Standalone集群
- – yarn:YARN集群
- – deploy-mode:决定了Driver进程运行在本地还是某一台Worker节点上
- – name:指定程序的名称
- – class:指定运行哪个类
- – jars:指定第三方jar包
- – conf:指定配置
- ② 资源选项:
- driver选项:
- – dirver-memory:分配给Driver的内存,默认分配1GB
- – driver-cores:分配给Driver运行的CPU核数,默认分配1核
- – supervise:故障自动重启
- executor选项:
- – executor-memory:分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
- – executor-cores:分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
- YARN:默认每个Executor分配1个core
- – num-executors NUM:YARN模式下用于指定Executor的个数,默认启动2个
- Standalone:默认分配所有可用的核数给每个Executor
- – total-executor-cores NUM:Standalone模式下用于指定所有Executor所用的总CPU核数(总核数 / 每个Executor核数 = Executor的个数)
- ③ 示例
//(1) 本地提交
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master local[2] \
--class
cn.test.sparkcore.SparkCoreWordCount \
~/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(2) Standalone集群提交
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--class cn.test.sparkcore.SparkCoreWordCount \
hdfs://node1:8020/spark/apps/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(3) 指定资源提交
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--class cn.test.sparkcore.SparkCoreWordCount \
--driver-memory 512M \
--executor-memory 512M \
--executor-cores 1 \
--total-executor-cores 2 \
hdfs://node1:8020/spark/apps/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(4) Spark on YARN(client模式)
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--deploy-mode client \
--class cn.test.sparkcore.SparkCoreWordCount \
hdfs://node1:8020/spark/apps/spark-WordCount.jar \
/example/WordCount.data \
/output/WordCount
//(5) Spark on YARN(cluster模式)
SPARK_HOME=/export/server/spark
${SPARK_HOME}/bin/spark-submit \
--master spark://node1:7077 \
--deploy-mode cluster \
--class bigdata.itcast.cn.spark.core.wordcount.SparkCoreWordCount \
hdfs://node1:8020/spark/apps/spark-wc.jar \
/example/wc.data \
/output/wc
7.Deploymode
- (1) 功能:决定了driver进程运行的位置
- (2) 选项
- ① client:默认选项,driver进程运行在客户端上
- 问题:如果一直使用client模式,所有Spark程序的driver都运行在同一台机器,导致机器的负载比较高,Driver故障率就比较高,性能比较差
- ② cluster:driver进程运行在Worker节点上
- (3) Spark on YARN的client模式和cluster模式区别
- ① client:driver和APPMaster是共存的
- APPMaster:申请资源,运行Executor,运行在NodeManager上
- Driver:管理所有Executor、调度Job执行,解析、分配、监控Task,运行在Client上
- Executors:运行JVM进程,其中执行Task任务和缓存数据
- ② cluster:driver和APPMaster合并了,整体由统一的进程来实现
- Driver Program(AppMaster):既进行资源申请,又进行Job调度
- Executors:运行JVM进程,其中执行Task任务和缓存数据
- 问题:为什么要在YARN上运行SparkCore程序?
答:为了避免同一套物理资源由多套资源管理平台管理,导致管理混乱,一般在工作中选择使用一种公共的资源管理平台来实现,这个公共的平台就是YARN,工作中一般将所有分布式计算程序全部提交在YARN上来实现运行
8.Spark运行的基本流程

-
step1:Driver及Executor的启动
- (1) 程序运行启动Driver,默认启动客户端那台机器上
- (2) Driver向Master申请资源
- (3) 根据提交的申请,Master会在Worker上启动Executor进程
- (4) 所有Executor会向Driver反向注册
 -
step2:Main方法的执行
- (1)Driver开始解析Main方法开始的代码
- (2)代码中出现RDD数据的使用,就会触发job
- (3)job触发后,通过回溯算法对RDD的转换构建DAG图
- ①回溯算法:倒推
- ②每遇到一个Shuffle就划分一个Stage
 -
step3:Task的解析分配
- (1)根据DAG从编号最小的Stage开始,转换为Task,
- (2)每一个Stage会转换为一个TaskSet集合,TaskSet集合中会有多个Task
- (3)Driver将TaskSet中的Task分配调度到Executor中执行

9.RDD(Resilient Distributed Datasets)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
wcRdd.unpersist()
- 2、checkpoint机制
- 功能:将RDD的数据持久化存储在HDFS上,以后读取RDD的数据,可以直接从HDFS上进行读取,不需要重新构建,数据更加安全
- 应用:非常重要的RDD或者数据量非常大,内存空间比较小
- 函数:
- 设置检查点目录:sc.setCheckpointDir
- 设置检查点:rdd.checkpoint
sc.setCheckpointDir("datas/sparkcore/chk")
wcRdd.checkpoint()
-
3、 persist与checkpoint的区别
- a.生命周期
- a)persist:数据自动删除:Task结束,遇到unpersist
- b)checkpoint:只能手动删除
- b.存储内容
- a)persist:将整个RDD对象缓存在内存中,包含了RDD的所有内容(血脉关系和数据)
- b)checkpoint:只存储了RDD的数据
-
(2) RDD的五大特性
- ①A list of partitions
每个RDD由多个分区组成 - ②A function for computing each split
对RDD的处理转换本质上都是RDD每个分区的转换操作(RDD是immutable,不可变,所有RDD的转换返回的是一个新的RDD) - ③A list of dependencies on other RDDs
每个RDD都会记录与父RDD的依赖关系 - ④Optionally, a Partitioner for key-value RDDs
可选:对于二元组KV结构的RDD,在经过Shuffle的时候,可以自定义指定分区器 Spark自带的分区器:Hash分区、范围分区 - ⑤Optionally, a list of preferred locations to compute each split on
可选:在Task分配计算时,可以计算最优路径解 -
(3) RDD的创建
val seq: immutable.Seq[Int] = 1.to(10)
val rdd1: RDD[Int] = sc.parallelize(seq)
sc.parallize / sc.makeRDD:【允许构建最优路径】:并行化一个集合变成一个RDD
val rdd2: RDD[String] = sc.textFile("datas/wordcount")
val rdd3 = sc.newAPIHadoopRDD("调用Hadoop中哪个输入类","返回Key类型","返回Value类型")
sc.textFile/sc.wholeTextFile/sc.newAPIHadoopRDD:读取外部数据源
- (4) RDD分区
RDD的分区个数决定了Task的个数,决定了并行度
- ① 指定分区个数:在构建RDD时,传递分区个数参数
val seq: Seq[Int] = 1.to(10)
val seqRdd: RDD[Int] = sc.parallelize(seq,numSlices = 3)
val inputRdd =
sc.textFile("datas/wordcount",minPartitions = 3)
val partitions = seqRdd.getNumPartitions
TaskContext.getPartitionId()
- ④常见的分区个数的规则
- 1、默认:线程数,如果底层读取的是分布式系统
自定义:指定分区个数 - 2、分布式系统:实现1:1的分区关系
- a.读HDFS文件:文件的一个块就自动对应RDD的一个分区
- b.读Hbase表:表的一个region自动对应RDD的一个分区
工作中:一般读取的都是分布式系统,一般都用默认的即可 - 3、问题:如果RDD是通过另外一个RDD得到的,分区个数是多少?
答:子RDD分区个数沿用父RDD的分区个数 - 4、小文件过多,分区数太多怎么办?
答:合并读取:wholeTextFile 示例:
val rdd3: RDD[String] = sc.textFile("datas/ratings100")
val rdd4: RDD[(String, String)] =
sc.wholeTextFiles("datas/ratings100")
rdd4.take(1).foreach(tuple =>
println(tuple._1+"---------"+tuple._2))
- (5) RDD算子(即RDD函数)
-
① 转换算子(Transformation)
- 1、功能:实现RDD的转换操作
- 2、特点:是lazy模式的,返回值是一个新的RDD,不会产生job和触发Task的运行
- 3、常见算子:
- Map:构建二元组(k、v对映射)list,实现对RDD中每个元素的处理,将处理的结果放入新的RDD中进行返回
- Flatmap:扁平化list,实现对RDD中的每个元素进行扁平化处理,将RDD中的每个集合的元素合并到一个RDD中返回
- Filter:过滤,实现对RDD中元素的过滤,符合条件的元素会放入一个新的RDD中返回
- Reducebykey:分组聚合
- Sortby、sortbykey:特例,经过shuffle的转换算子,使用RangePartition,做数据采样实现分区间的有序,会读取数据,触发Job去读取数据
 -
② 触发算子(Action)
- 1、功能:实现RDD数据的读取操作
- 2、特点:触发job的运行,构建Task,实现物理上的RDD的数据读取、转换,返回值不为RDD类型,为普通类型或者为空
- 3、常见算子:
- foreach:打印输出,没有返回值
- collect:将RDD每个分区的数据放入一个数组中,返回数组
- reduce:聚合函数,返回聚合后的结果
- fold:聚合函数,返回聚合后的结果
- count:统计元素的个数返回
- Take:取RDD集合中的数据(通过单个线程从RDD的每个分区中取想要的数据)放入一个数组中,返回Driver中,返回值是数组
- saveAsTextFile:没有返回值,保存数据到外部文件系统

10.RDD常用算子
- (1) 基本函数
map、flatMap、filter、take、top、foreach - (2) 分区操作函数
- ①函数:mapPartitions,foreachPartition
- ②功能:对RDD的每个分区调用参数函数进行处理
- ③场景:基于分区的资源构建
- (3) 重分区函数
- ①函数:repartition、coalesce
- ②功能:调整RDD的分区个数,返回一个新的RDD
- ③关系
两者都用来改变RDD的partition的数量,repartition的底层调用的是coalesce方法 - ④区别
- repartition:用于调大分区个数,必须经过shuffle
- coalesce(分区个数,是否经过shuffle:默认false,不shuffle):一般用于调小分区个数,如果为true,就等于repartition效果
- (4) 聚合函数
- ①函数
- reduce:分区内聚合和分区间聚合逻辑一致,没有初始值
- fold:分区内聚合和分区间聚合逻辑一致,有初始值
- aggregate:分区内聚合和分区间聚合逻辑可以自定义,有初始值
- ②功能:实现分布式的聚合
- (5) PairRDD函数
- ①特征:xxxByKey,只有二元组类型的RDD才能调用的函数
分类: - ② 聚合函数:
- reduceByKey:用法与reduce是一致的,按照Key分组,对Value进行reduce聚合
按照Key分组 + reduce聚合 - aggregateByKey:用法与aggregate是一致的,按照Key分组,对Value进行aggregate聚合
按照Key分组 + aggregate聚合 注:reduceByKey和foldByKey,aggreageByKey底层使用的都是CombinerByKey:先分区内聚合,再分区间聚合 - ③ 分组函数:
- groupByKey:按照key进行分组
- 注意:能用reduceByKey就不要使用groupByKey+reduce
- reduceByKey:分组聚合,对每个分区内部,先分组聚合,再分区间分组聚合
- groupByKey + reduce:将所有数据放到一起再做分组
- reduceByKey的性能要高于groupByKey
- ④ 排序函数:
- (6) 关联函数:Join
rdd1:KV rdd2:KW rdd1.join(rdd2) = RDD:[K,(V,W)]
11.Spark读写外部数据
- (1) 原始数据读取
HDFS、Hbase - (2) 数据处理
SparkCore - (3) 结果存储
HDFS、Hbase、MySQL - (4) SparkCore的数据源接口
- ①parallelize / makeRDD:将一个Scala中的集合转换为一个RDD对象
- ②textFile / wholeTextFiles:读取外部文件系统的数据转换为一个RDD对象
- ③newAPIHadoopRDD / newAPIHadoopFile:调用Hadoop的InputFormat来读取数据转换为一个RDD对象
12.SparkCore中的共享变量
- (1) 共享变量
在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量 - (2) 广播变量(Broadcast Variables)
- ①功能:
- 可以将Driver中的一个变量通过广播的形式发送给Executor,将这个变量放在Executor中
- 减少数据在网络中的IO,提高性能
- 比如做Wordcount,过滤符号,只保留单词,统计每个单词出现的次数
代码:
val list = List("!",",","%","#")
val broad = sc.broadcast(list)
val rsRdd: RDD[(String, Int)] = inputRdd
.filter(line => line != null && line.trim.length > 0)
.flatMap(line => line.trim.split(" "))
.filter(word => {
val value = broad.value
!value.contains(word) && word.trim.length >0
})
.map(word => (word,1))
.reduceByKey((temp,item) => temp+item)
- (3) 累加器(Accumulators)
- ①功能:
- 实现分布式计数:一般用于结果计数或者日志调试
- a.每个分区内部计数
- b.再将每个分区的结果进行累加
- 比如做Wordcount,过滤符号,只保留单词,统计每个单词出现的次数,以及统计符号的个数

代码:
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object WordCountShareVariable {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val inputRdd = sc.textFile("datas/filter/datas.input")
val list = List("!",",","%","#")
val broad = sc.broadcast(list)
val acccnt = sc.longAccumulator("acccnt")
val rsRdd: RDD[(String, Int)] = inputRdd
.filter(line => line != null && line.trim.length > 0)
.flatMap(line => line.trim.split(" "))
.filter(word => {
val value = broad.value
if(value.contains(word)) acccnt.add(1L)
!value.contains(word) && word.trim.length >0
})
.map(word => (word,1))
.reduceByKey((temp,item) => temp+item)
rsRdd.foreach(println)
println(s"符号的个数:${acccnt.value}")
Thread.sleep(10000000L)
sc.stop()
}
}
13.Spark shuffle
- (1) 对比MapReduce的shuffle

Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等 
Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步 
- (2) Shuffle的功能
- ① 实现全局排序、分组、分区
- ② Spark在DAG调度阶段会将一个Job划分为多个Stage,上游Stage做map工作,下游Stage做reduce工作,其本质上还是MapReduce计算框架。
- ③ Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce输入中,涉及到序列化反序列化、跨节点网络IO以及磁盘读写IO等。
- ④ Spark的Shuffle分为Write和Read两个阶段,分属于两个不同的Stage,前者是Parent Stage的最后一步,后者是Child Stage的第一步
- ⑤ Stage划分为两种类型
- ShuffleMapStage,在Spark 1个Job中,除了最后一个Stage之外,其他所有的Stage都是此类型
- a.将Shuffle数据写入到本地磁盘,ShuffleWriter
- b.在此Stage中,所有的Task称为:ShuffleMapTask
- ResultStage,在Spark的1个Job中,最后一个Stage,对结果RDD进行操作
- a.会读取前一个Stage中数据,ShuffleReader
- b.在此Stage中,所有的Task任务称为ResultTask
- (3)Spark Shuffle的发展
- ① Spark在1.1以前的版本一直是采用Hash Shuffle的实现的方式
- ② 到1.1版本时参考HadoopMapReduce的实现开始引入Sort Shuffle
- ③ 在1.5版本时开始Tungsten钨丝计划,引入UnSafe Shuffle优化内存及CPU的使用
- ④ 在1.6中将Tungsten统一到Sort Shuffle中,实现自我感知选择最佳Shuffle方式
- ⑤ 到的2.0版本,Hash Shuffle已被删除,所有Shuffle方式全部统一到Sort Shuffle一个实现中
14.总结
- (1) 基本概念
- ① Application:用户开发的Spark应用程序,每个Spark程序包含了一个Driver进程和多个Executor进程在集群中
- 每个程序的Driver和Executor都是独立的,不是共享的
- ② Application jar:将开发好的程序打成jar包,提交集群运行,jar包中不能包含hadoop和spark的依赖jar包
- ③ Job:触发真正执行程序的单元,由触发函数来触发job的构建
- ④ Stage:将一个Job中根据是否产生宽依赖来划分Stage【阶段:逻辑计划】
- 算法:回溯算法
- 对于整个程序来说,Stage是Application全局编号的每个阶段,为了实现不同job之间stage的结果共享
- ⑤ TaskSet:每个Stage会转换为一个TaskSet【物理计划】,Task的集合
- 每个TaskSet中可以多个Task:Stage中的RDD的最大分区数
- ⑥ Task:物理任务,每个分区对应一个Task任务来执行
- Task由Driver中的组件进行分配运行在Executor中,使用Executor中的资源来源
- 每个Task使用1Core来完成运行
- ⑦ Driver:初始化进程,负责运行main方法,创建SparkContext对象
- 申请资源:启动Executor
- 解析、调度、监控Task
- ⑧ Executor:执行进程,运行在Worker节点上,使用Worker分配的资源负责执行Task
- ⑨ ClusterManager:分布式资源管理平台的主节点
- ⑩ Worker Node:分布式资源管理平台的从节点
- ? deploymode:决定了driver进程运行的位置,
client【客户端】、cluster【Worker节点上】 - (2) Spark内核调度流程(Job调度流程)
- Step1:集群启动
- 主——Master:管理节点
- a.接受客户端请求
- b.管理从节点Worker节点
- c.资源管理
- 从——Worker:计算节点
- 使用自己所在节点的资源运行Executor进程:给每个Executor分配一定的资源
- Step2:提交程序
- Driver进程
- a.由spark-submit脚本客户端进行启动
- b.向主节点申请Executor资源,让主节点在从节点上根据需求配置启动对应的Executor
- c.解析代码逻辑:将代码中的逻辑转换为Task
- 如果遇到RDD中的数据的使用:构建一个Job,触发Task的运行
- d.将Task分配给Executor去运行
- e.监控每个Executor运行的Task状态
- Executor进程
- a.运行在Worker上,使用Worker分配的资源等待运行Task
- b.所有Executor启动成功以后会向Driver进行注册
- c.Executor收到分配Task任务,运行Task
- Step3:运行程序
- 1)Driver开始解析Main方法开始的代码
- 2)代码中出现RDD数据的使用,就会触发job
- 3)job触发后,DAGScheduler组件通过回溯算法对RDD的转换构建DAG图,以Shuffle划分Stage
- DAGScheduler:专门负责构建DAG,对象封装在SparkContext
- 根据DAG从编号最小的Stage开始,转换为TaskSet
- 每一个Stage会转换为一个TaskSet集合,TaskSet集合中会有多个Task
- TaskScheduler:负责调度TaskSet中Task,对象封装在SparkContext,将每个Task提交给TaskManager
- TaskManager:负责将每个Task分配给Executor运行
- Driver将TaskSet中的Task分配调度到Executor中执行
- (3) Spark并行度
- ① 资源并行度:Executor的个数怎么决定?
- 原则:充分利用当前机器所有资源
- 假设:机器10台,每台16core,32GB,现在只运行一个程序,让这个程序得到所有资源
- CPU核数:每个Executor至少给定2Core,保证Executor中可以并行
- 内存大小:给CPU核数的2倍
- ② 数据并行度:Task的个数怎么决定?
- 原则:Task个数由分区数决定,建议Task个数为整个Executor使用的CPU核数的2 ~ 3倍
- 假设:启动了10个Executor,每个Executor2Core4GB
- a.总CPU:20个
- b. 建议:Task和分区数 = 40 ~ 60
|