2、SparkCore
2.1. Partition
2.1.1. 概念 Spark
- RDD 是一种分布式的数据集,由于数据量很大,因此要它被切分并存储在各个结点的分区当中。
- Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象数据集,其中每个RDD是由若 干个Partition组成。
- RDD1包含了5个Partition,RDD2包含了3个Partition,这些Partition分布在4个节点中。
2.1.2. 分区方式
2.1.3. HDFS与Partition
? hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子, 但一个盒子里的内容只可能来自同一份文件。假设block设置为128M,你的文件是260M,那么这 份文件占3个block(128+128+4)。这样的设计虽然会有一部分磁盘空间的浪费,但是整齐的 block大小,便于快速找到、读取对应的内容。(p.s. 考虑到hdfs冗余设计,默认三份拷贝,实际 上3*3=9个block的物理空间。)
? spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据 (RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的
-
block位于存储空间、partition 位于计算空间, -
block的大小是固定的、partition 大小是不固定的, -
block是有冗余的、不会轻易丢失,partition(RDD)没有冗余设计、丢失之后重新计算得到。
? Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB, 因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。
2.2. RDD
- RDD(Resilient Distributed Dataset) 弹性分布式数据集。
2.2.1. RDD 的五大属性
RDD 是由一系列的partition 组成的。- 函数是作用在每一个
partition(split) 上的。 RDD 之间有一系列的依赖关系。- 分区器是作用在
(K,V) 格式的RDD 上。 RDD 提供一系列最佳的计算位置。
2.2.2. RDD 流程图
注意:
2.2.3. Lineage 血统
? RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得 来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
2.3. 系统架构
Master (standalone 模式):资源管理的主节点(进程)。Cluster Manager :在集群上获取资源的外部服务(例如:standalone ;yarn ;mesos )。Worker (standalone 模式):资源管理的从节点(进程)或者说是是管理本机资源的进程。Dirver (program ):用来连接工作进程(worker )的程序 。Executor :是在一个worker 进程所管理的节点上为某Application 启动的一个个进程,这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executors 。
Application :基于Spark 的用户程序,包含driver 程序和运行在集群上的executor 程序,即一个完整的spark 应用 。Task :被发送到**executor 上的工作单元**。Job :包含很多任务(Task )的并行计算,和action 算子对应。Stage :一个job 会被拆分成很多组任务,每组任务被称为Stage (就像MapReduce 分为MapTask 和ReduceTask 一样)。
3、算子(单文件)
- Spark 记录了 RDD 之间的生成和依赖关系。但是只有当 F 进行行动操作时,Spark 才会根据 RDD 的依赖关系生成 DAG,并从起点开始真正的计算。
常见的算子如下图所示,主要也分为如下几种:
- Transformations 转换算子
- Actions 行动算子
3.1. Transformations 转换算子
Transformations 类算子是一类算子(本质就是函数)叫做转换算子,如map ,flatMap ,reduceByKey 等。Transformations 算子是延迟执行,也叫懒加载执行。
3.1.2. 常见Transformation 类算子
filter :过滤符合条件的记录数,true 保留,false 过滤掉。map :将一个RDD 中的每个数据项,通过map 中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。flatMap :先map 后flat 。与map 类似,每个输入项可以映射为0到多个输出项。sample 随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样。reduceByKey 将相同的Key 根据相应的逻辑进行处理。sortByKey /sortBy 作用在K,V格式的RDD 上,对key 进行升序或者降序排序。
3.1.3. 补充部分算子(多文件与分区)
-
转换算组 join
leftOuterJoin rightOuterJoin fullOuterJoin - 这些
join 都是作用在**K,V 格式的RDD 上。根据key 值进行连接**,例如:(K,V)join(K,W)返回(K,(V,W)) - 注意:
join 后的分区数与父RDD分区数多的那一个相同。 -
union
- 合并两个数据集。两个数据集的类型要一致。
- 返回新的
RDD 的分区数是合并RDD 分区数的总和。 -
intersection : 取两个数据集的交集。 -
subtract : 取两个数据集的差集。 -
mapPartitions
- mapPartition与
map 类似,单位是每个partition 上的数据。 -
distinct(map+reduceByKey+map) 对RDD 内数据去重。 -
cogroup
- 当调用类型
(K,V) 和(K,W) 的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>)) 。 -
mapPartitionsWithIndex :类似于mapPartitions ,除此之外还会携带分区的索引值。 -
repartition :增加或减少分区。此算子会产生shuffle 。 -
coalesce :减少分区
-
coalesce 常用来减少分区,算子中第二个参数是减少分区的过程中是否产生shuffle 。 -
true 为产生shuffle ,false 不产生shuffle 。默认是false 。 -
如果coalesce 设置的分区数比原来的RDD 的分区数还多的话,第二个参数设置为false 不会起作用(转换之后分区数大于之前),如果设置成true ,效果和repartition 一样。 -
repartition(numPartitions) = coalesce(numPartitions,true)
-
groupByKey
- 作用在
K,V 格式的RDD 上。根据Key 进行分组。作用在(K,V) ,返回(K,Iterable <V>) 。 -
zip
- 将两个
RDD 中的元素(KV格式/非KV格式 )变成一个KV 格式的RDD ,两个RDD 的个数必须相同。 -
zipWithIndex
- 该函数将
RDD 中的元素和这个元素在RDD 中的索引号(从0开始)组合成(K,V) 对。
3.2. Action 行动算子
? Action 类算子也是一类算子叫做行动算子,如foreach ,collect ,count 等。Transformations 类算子是延迟执行,Action 类算子是触发执行。一个application 应用程序中有几个Action 类算子执行,就有几个job 运行。
3.2.2. 常见Action 类算子
-
count :返回数据集中的元素数。会在结果计算完成后回收到Driver 端。 -
take(n) :返回一个包含数据集前n 个元素的集合。 -
first :效果等同于take(1) ,返回数据集中的第一个元素。 -
foreach :循环遍历数据集中的每个元素,运行相应的逻辑。 -
collect :将计算结果回收到Driver 端。 -
foreachPartition :遍历的数据是每个partition 的数据。 -
countByKey
- 作用到
K,V 格式的RDD 上,根据Key 计数相同Key 的数据集元素。 -
countByValue
- 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
-
reduce
3.3. 控制算子
将RDD 持久化,持久化的单位是partition 。
- 控制算子有三种,
cache ,persist ,checkpoint 。 cache 和persist 都是懒执行的。必须有一个**action 类算子触发执行**。checkpoint 算子不仅能将RDD 持久化到磁盘,还能切断RDD 之间的依赖关系。
3.3.1. cache
默认将RDD 的数据持久化到内存中。cache 是懒执行。
cache() = persist() = persist(StorageLevel.Memory_Only)
测试代码:
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("CacheTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");
long startTime = System.currentTimeMillis();
long count = lines.count();
long endTime = System.currentTimeMillis();
System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ (endTime-startTime));
long countStartTime = System.currentTimeMillis();
long countrResult = lines.count();
long countEndTime = System.currentTimeMillis();
System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime - countStartTime));
jsc.stop();
3.3.2. persist
可以指定持久化的级别。最常用的是MEMORY_ONLY 和MEMORY_AND_DISK 。
持久化级别如下:
上面这些带有_2 的表示有副本replication 。
cache 和persist 的注意事项:
persist 懒执行,必须有一个action 类算子触发执行。cache 和persist 算子的返回值可以赋值给一个变量,在其他job 中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition (RDD的组成)。cache 和persist 算子后不能立即紧跟action 算子。
3.3.3. checkpoint
checkpoint 将RDD 持久化到磁盘,还可以切断RDD 之间的依赖关系,也是懒执行。
执行原理:
- 当
RDD 的job 执行完毕后,会从final RDD 从后往前回溯。 - 当回溯到某一个
RDD 调用了checkpoint 方法,会对当前的RDD 做一个标记。 Spark 框架会自动启动一个新的job ,重新计算这个RDD 的数据,将数据持久化到HDFS 上。
? 使用checkpoint 时常用优化手段:对RDD 执行checkpoint 之前,最好对这个RDD 先执行cache ,这样新启动的job 只需要将内存中的数据拷贝到HDFS 上就可以,省去了重新计算这一步。
demo 示例如下:
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("checkpoint");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setCheckpointDir("./checkpoint");
JavaRDD<String> lines = sc.textFile("./NASA_access_log_Aug95");
lines.checkpoint();
lines.count();
jsc.stop();
4、Spark集群搭建
Spark集群搭建 :https://blog.csdn.net/weixin_43660536/article/details/119522431 .
5、任务提交方式
5.1. Standalone-client
5.1.2. 执行流程:
-
client 模式提交任务后,会在客户端启动Driver 进程。 -
Driver 会向Master 申请启动Application 启动的资源。 -
资源申请成功,Driver 端将task 分发到worker 端执行,启动executor 进程(任务的分发)。 -
Worker 端(exectuor 进程)将task 执行结果返回到Driver 端(任务结果的回收)。
总结:
client 模式适用于测试调试程序。Driver 进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver 端可以看到task 执行的情况。- 生产环境下不能使用
client 模式,是因为:假设要提交100个application 到集群运行,Driver 每次都会在client 端启动,那么就会导致客户端100次网卡流量暴增的问题。
5.2. Standalone-cluster提交任务方式
5.2.2. 执行流程:
cluster 模式提交应用程序后,会向Master 请求启动Driver 。Master 接受请求,随机在集群一台节点启动Driver 进程。Driver 启动后为当前的应用程序申请资源。Driver 端发送task 到worker 节点上执行(任务的分发)。worker 上的executor 进程将执行情况和执行结果返回给Driver 端(任务结果的回收)。
注意:
Driver 进程是在集群某一台Worker 上启动的,在提交applicaition 的客户端是无法查看task 的执行情况的。
5.3. Standalone总结
5.4. yarn-client
提交命令,如下三种方式皆可进行任务提交。
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
./spark-submit --master yarn–client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
执行原理图解:
5.4.2. 执行流程:
-
客户端提交一个Application ,在客户端启动一个Driver 进程。 -
应用程序启动后会向RS (ResourceManager )(相当于standalone 模式下的master 进程)发送请求,启动AM (ApplicationMaster )。 -
RS 收到请求,随机选择一台NM (NodeManager )启动AM 。这里的NM 相当于Standalone 中的Worker 进程。 -
AM 启动后,会向RS 请求一批container 资源,用于启动Executor 。 -
RS 会找到一批NM (包含container )返回给AM ,用于启动Executor 。 -
AM 会向NM 发送命令启动Executor 。 -
Executor 启动后,会反向注册给Driver ,Driver 发送task 到Executor ,执行情况和结果返回给Driver 端。
总结:
Yarn-client 模式同样是适用于测试。原因同Standalone-cluster原因一样。
ApplicationMaster (executorLauncher)的在此模式中的作用:
-
为当前的Application 申请资源 -
给NodeManager 发送消息启动Executor 。
注意:ApplicationMaster 在此种模式下有launchExecutor 和申请资源的功能,并没有作业调度的功能。
5.5. yarn-cluster提交任务方式
提交命令:
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
或者
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
执行原理图解
5.5.2. 执行流程:
-
客户机提交Application 应用程序,发送请求到RS (ResourceManager ),请求启动AM (ApplicationMaster )。 -
RS 收到请求后随机在一台NM (NodeManager )上启动AM (相当于Driver 端)。 -
AM 启动,AM 发送请求到RS ,请求一批container 用于启动Excutor 。 -
RS 返回一批NM 节点给AM 。 -
AM 连接到NM ,发送请求到NM 启动Excutor 。 -
Excutor 反向注册到AM 所在的节点的Driver 。Driver 发送task 到Excutor 。
总结:
Yarn-Cluster 主要用于生产环境中,因为Driver 运行在Yarn 集群中某一台nodeManager 中,每次提交任务的Driver 所在的机器都是不再是提交任务的客户端机器,而是多个NM 节点中的一台,不会产生某一台机器网卡流量激增的现象,但同样也有缺点,任务提交后不能看到日志。只能通过yarn 查看日志。
ApplicationMaster 在此模式中的的作用:
-
为当前的Application 申请资源 -
给NodeManger 发送消息启动Executor 。 -
任务调度。
停止集群任务命令:yarn application -kill applicationID
6、RDD之窄依赖和宽依赖
RDD 之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。
6.1. 窄依赖
- ? 父
RDD 和子RDD 的partition 之间的关系是一对一的。或者父RDD 和子RDD 的partition 关系是多对一的。不会有shuffle 的产生。
6.2. 宽依赖
父RDD 与子RDD 的partition 之间的关系是一对多。会有**shuffle 的产生**。
6.3. 宽窄依赖图理解
7、Stage
Spark 任务会根据RDD 之间的依赖关系,形成一个**DAG 有向无环图**,DAG 会提交给DAGScheduler ,DAGScheduler 会把DAG 划分成相互依赖的多个stage ,- 划分
stage 的依据就是**RDD 之间的宽窄依赖**。遇到***宽依赖***就划分stage ,每个stage 包含一个或多个task 任务。 - 然后将这些
task 以taskSet 的形式提交给TaskScheduler 运行。 stage 是由一组并行的task 组成。
7.1. stage切割规则
-
切割规则:从后往前,遇到宽依赖就切割stage 。 1.从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中; 2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的; 3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是 ShuffleMapTask; 4.代表当前Stage的算子一定是该Stage的最后一个计算步骤; -
由于spark中stage的划分是根据shuffle来划分的,而宽依赖必然有shuffle过程,因此可以说spark是根据宽窄依赖来划分stage的。
stage计算模式–pipeline
pipeline 管道计算模式,pipeline 只是一种计算思想、模式。
注意点:
1.数据在管道里落地?
2.Stage 的task 并行度是由stage 的最后一个RDD 的分区数来决定的 。
3.如何改变RDD 的分区数?
- reduceByKey(XXX,3)
- GroupByKey(4)
- sc.textFile(path,numpartition)
使用算子时传递分区num参数 就是分区partition 的数量。
8、SparkShuffle
8.1. SparkShuffle概念
- RDD之间根据宽依赖来划分stage,产生shuffle。
- 在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段。
Shuffle Write :Shuffle Map Task对Task产生的中间数据进行操作,再根据数据分区方式对中间数据进行分区。Shuffle Read :Shuffle Read Task 会拉取Shuffle Write 阶段中产生的并已经分好区的中间数据。
Spark 中有两种Shuffle 类型,HashShuffle 和SortShuffle 。
Spark1.2 之前是HashShuffle ,Spark1.2 引入SortShuffle 。spark1.2-spark1.6 之间是有HashShuffle 和SortShuffle 的。spark2.0 就只有sortshuffle 。
哪些spark算子会有shuffle?
-
- 去重,distinct
- 排序,groupByKey,reduceByKey等
- 重分区,repartition,coalesce
- 集合或者表操作,interection,join
8.2. HashShuffle
8.2.1. 普通机制
执行流程:
1)每一个map task 将不同结果写到不同的buffer 中,每个buffer 的大小为32K 。buffer 起到数据缓存的作用。
2)每个buffer 文件最后对应一个磁盘小文件。
3)reduce task 来拉取对应的磁盘小文件。
总结:
1)map task 的计算结果会根据下个分区器(默认是hashPartitioner )来决定写入到哪一个磁盘小文件中去。ReduceTask 会去Map 端拉取相应的磁盘小文件。
2)产生的磁盘小文件的个数: M(map task的个数)*R(reduce task的个数) 。
产生的磁盘小文件过多,会导致以下问题:
-
在Shuffle Write 过程中会产生很多写磁盘小文件的对象。 -
在Shuffle Read 过程中会产生很多读取磁盘小文件的对象。 -
在JVM 堆内存中对象过多会造成频繁的gc ,gc 还无法解决运行所需要的内存的话,就会OOM 。 -
在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task 失败,TaskScheduler 不负责重试,由DAGScheduler 负责重试Stage 。
8.2.2. 合并机制
执行流程:
- 合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。
- 在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
总结
- 产生磁盘小文件的个数:
C(core的个数)*R(reduce的个数) 。
8.3. SortShuffle
8.3.1. 普通机制
执行流程
-
map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M 。 -
在shuffle 的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M 时,比如现在内存结构中的数据为5.01M ,那么他会申请5.01*2-5=5.02M 内存给内存数据结构。 -
如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。 -
在溢写之前内存结构中的数据会进行排序分区 -
然后开始溢写磁盘,写磁盘是以batch 的形式去写(批量),一个batch 是1万条数据。 -
map task 执行完成后,会将这些磁盘小文件 合并成一个大的磁盘文件,同时生成一个索引文件 。 -
reduce task 去map 端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。
总结
- 产生磁盘小文件的个数:
2*M(map task的个数)
8.3.2. bypass机制
bypass机制示意图
总结
8.4. Shuffle文件寻址
-
MapOutputTracker
MapOutputTracker 是Spark 架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。MapOutputTrackerMaster 是主对象,存在于Driver 中。MapOutputTrackerWorker 是从对象,存在于Excutor 中。 -
BlockManager
-
BlockManager :块管理者,是Spark 架构中的一个模块,也是一个主从架构。 -
BlockManagerMaster ,主对象,存在于Driver中。 -
BlockManagerWorker ,从对象,存在于Excutor中。
BlockManagerMaster 会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知BlockManagerSlave 传输或者删除数据BlockManagerWorker ,从对象,存在于Excutor 中。BlockManagerWorker 会与BlockManagerWorker 之间通信。 -
无论在Driver 端的BlockManager 还是在Excutor 端的BlockManager 都含有四个对象:
DiskStore :负责磁盘的管理。MemoryStore :负责内存的管理。ConnectionManager :负责连接其他的BlockManagerWorker 。BlockTransferService :负责数据的传输。
Shuffle 文件寻址图
-
Shuffle 文件寻址流程 -
当map task 执行完成后,会将task 的执行情况和磁盘小文件的地址封装到MapStatus 对象中,通过MapOutputTrackerWorker 对象向Driver 中的MapOutputTrackerMaster 汇报。 -
在所有的map task 执行完毕后,Driver 中就掌握了所有的磁盘小文件的地址。 -
在reduce task 执行之前,会通过Excutor 中MapOutPutTrackerWorker 向Driver 端的MapOutputTrackerMaster 获取磁盘小文件的地址。 -
获取到磁盘小文件的地址后,会通过BlockManager 中的ConnectionManager 连接数据所在节点上的ConnectionManager ,然后通过BlockTransferService 进行数据的传输。 -
BlockTransferService 默认启动5 个task 去节点拉取数据。默认情况下,5 个task 拉取数据量不能超过48M 。
8.5. Shuffle调优
- 在代码中,不推荐使用,硬编码。
new SparkConf().set("spark.shuffle.file.buffer","64")
- 在提交spark任务的时候,推荐使用。
spark-submit --conf spark.shuffle.file.buffer=64 –-conf ….
- 在
conf 下的spark-default.conf 配置文件中,不推荐,因为是写死后所有应用程序都要用。
9、Spark RDD小结
9.1. Spark application概念小结
Application :基于Spark 的用户程序,包含driver 程序和运行在集群上的executor 程序,即一个完整的spark 应用 。
- 一个application包含多个Job(作业)。
Job :包含多个Stage(阶段),与action 算子一 一对应。Stage :一个job 会被拆分成很多组任务Stage (就像MapReduce 分为MapTask 和ReduceTask 一样)。- Task:一个被发送到executor上的工作单元。一个stage由多个Task组成(由partition决定)。
- pipline:一个Pipeline对应一个Task。一个stage由多个pipeline组成(由task决定)。
Task :被发送到**executor 上的工作单元**。- RDD:一个RDD包含了多个Partition
- partition:多个Partition是并行操作的(并行度)
9.2. Spark 代码 运行流程
10、Spark资源调度和任务调度
10.1. 调度流程
-
启动集群后,Worker 节点会向Master 节点汇报资源情况,Master 掌握了集群资源情况。 -
当Spark 提交一个Application 后,根据RDD 之间的依赖关系将Application 形成一个DAG 有向无环图。 -
任务提交后,Spark 会在Driver 端创建两个对象:DAGScheduler 和TaskScheduler ,DAGScheduler 是任务调度的高层调度器,是一个对象。 -
DAGScheduler 的主要作用就是将DAG 根据RDD 之间的宽窄依赖关系划分为一个个的Stage ,然后将这些Stage 以TaskSet 的形式提交给TaskScheduler (TaskScheduler 是任务调度的低层调度器,这里TaskSet 其实就是一个集合,里面封装的就是一个个的task 任务,也就是stage 中的并行的task 任务)。 -
TaskSchedule 会遍历TaskSet 集合,拿到每个task 后会将task 发送到Executor 中去执行(其实就是发送到Executor 中的线程池ThreadPool 去执行)。 -
task 在Executor 线程池中的运行情况会向TaskScheduler 反馈,当task 执行失败时,则由TaskScheduler 负责重试,将task 重新发送给Executor 去执行,默认重试3次。如果重试3次依然失败,那么这个task 所在的stage 就失败了。 -
stage 失败了则由DAGScheduler 来负责重试,重新发送TaskSet 到TaskScheduler ,Stage 默认重试4次。如果重试4次以后依然失败,那么这个job 就失败了。job 失败了,Application 就失败了。 -
TaskScheduler 不仅能重试失败的task ,还会重试straggling (落后,缓慢)task (也就是执行速度比其他task慢太多的task )。如果有运行缓慢的task 那么TaskScheduler 会启动一个新的task 来与这个运行缓慢的task 执行相同的处理逻辑。两个task 哪个先执行完,就以哪个task 的执行结果为准。这就是Spark 的推测执行机制。在Spark 中推测执行默认是关闭的。推测执行可以通过spark.speculation 属性来配置。 -
注意: -
-
对于ETL 类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。 -
如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task 重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。
10.2. 流程图解
10.3. 粗细粒度资源申请
10.3.1. 粗粒度资源申请(Spark)
-
在Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task 执行完成后,才会释放这部分资源。 -
优点:在Application 执行之前,所有的资源都申请完毕,每一个task 直接使用资源就可以了,不需要task 在执行前自己去申请资源,task 启动就快了,task 执行快了,stage 执行就快了,job 就快了,application 执行就快了。 -
缺点:直到最后一个task 执行完成才会释放资源,集群的资源无法充分利用。
10.3.2. 细粒度资源申请(MR)
Application 执行之前不需要先去申请资源,而是直接执行,让**job 中的每一个task 在执行前自己去申请资源,task 执行完成就释放资源**。- 优点:集群的资源可以充分利用。
- 缺点:
task 自己去申请资源,task 启动变慢,Application 的运行就响应的变慢了。
11、案例解答
spark案列解答.https://blog.csdn.net/weixin_43660536/article/details/119610731
12、广播变量和累加器
12.1. 广播变量
-
sparkScontext.broadcast() 方法用来定义广播变量 -
广播变量只能在Driver 端定义与修改,不能在Executor 端定义与修改。 -
会将广播变量封装发送到Executor的BlockManage中。 -
同个Executor的Task共享BlockManage的数据。
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello yjx")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach {println}
sc.stop()
12.2. 累加器
val accumulator = sparkContext.longAccumulator 定义累加器- 累加器在
Driver 端定义赋初始值,累加器只能在Driver 端读取,在Excutor 端更新。
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.longAccumulator
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()
13、spark核心源码解析
spark核心源码解析.https://blog.csdn.net/weixin_43660536/article/details/119610936
14、Spark内存管理
Spark 执行应用程序时,Spark 集群会启动Driver 和Executor 两种JVM 进程,Driver 负责创建SparkContext 上下文,提交任务,task 的分发等。Executor 负责task 的计算任务,并将结果返回给Driver 。同时需要为需要持久化的RDD 提供储存。Driver 端的内存管理比较简单,这里所说的Spark 内存管理针对Executor 端的内存管理。
Spark 内存管理分为静态内存管理和统一内存管理,Spark1.6 之前(不包含)使用的是静态内存管理,Spark1.6 之后引入了统一内存管理。
静态内存管理中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。
统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。
Spark1.6 及1.6 版本之后的版本默认使用的是统一内存管理。
要想使用静态内存可以通过参数spark.memory.useLegacyMode
? 设置为true (默认为false )使用静态内存管理。
静态内存管理分布图
统一内存管理分布图
reduce 中OOM如何处理?
-
减少每次拉取的数据量。 -
提高shuffle 聚合的内存比例。 -
提高Excutor 的总内存。
|