2、SparkCore
2.1. Partition
2.1.1. 概念 Spark
- RDD 是一种分布式的数据集,由于数据量很大,因此要它被切分并存储在各个结点的分区当中。
- Spark中,RDD(Resilient Distributed Dataset)是其最基本的抽象数据集,其中每个RDD是由若 干个Partition组成。
- RDD1包含了5个Partition,RDD2包含了3个Partition,这些Partition分布在4个节点中。

2.1.2. 分区方式

2.1.3. HDFS与Partition
? hdfs中的block是分布式存储的最小单元,类似于盛放文件的盒子,一个文件可能要占多个盒子, 但一个盒子里的内容只可能来自同一份文件。假设block设置为128M,你的文件是260M,那么这 份文件占3个block(128+128+4)。这样的设计虽然会有一部分磁盘空间的浪费,但是整齐的 block大小,便于快速找到、读取对应的内容。(p.s. 考虑到hdfs冗余设计,默认三份拷贝,实际 上3*3=9个block的物理空间。)
? spark中的partition 是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。partition 是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据 (RDD)的partition 大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定的
-
block位于存储空间、partition 位于计算空间, -
block的大小是固定的、partition 大小是不固定的, -
block是有冗余的、不会轻易丢失,partition(RDD)没有冗余设计、丢失之后重新计算得到。
? Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB, 因此该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。
2.2. RDD
- RDD(Resilient Distributed Dataset) 弹性分布式数据集。
2.2.1. RDD的五大属性
RDD是由一系列的partition组成的。- 函数是作用在每一个
partition(split)上的。 RDD之间有一系列的依赖关系。- 分区器是作用在
(K,V)格式的RDD上。 RDD提供一系列最佳的计算位置。
2.2.2. RDD流程图

注意:
2.2.3. Lineage血统
? RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得 来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。
2.3. 系统架构
 
Master(standalone模式):资源管理的主节点(进程)。Cluster Manager:在集群上获取资源的外部服务(例如:standalone;yarn;mesos)。Worker(standalone模式):资源管理的从节点(进程)或者说是是管理本机资源的进程。Dirver(program):用来连接工作进程(worker)的程序 。Executor:是在一个worker进程所管理的节点上为某Application启动的一个个进程,这个进程负责运行任务,并且负责将数据存在内存或者磁盘上,每个应用之间都有各自独立的executors。

Application:基于Spark的用户程序,包含driver程序和运行在集群上的executor程序,即一个完整的spark应用 。Task:被发送到**executor上的工作单元**。Job:包含很多任务(Task)的并行计算,和action算子对应。Stage:一个job会被拆分成很多组任务,每组任务被称为Stage(就像MapReduce分为MapTask和ReduceTask一样)。
3、算子(单文件)
- Spark 记录了 RDD 之间的生成和依赖关系。但是只有当 F 进行行动操作时,Spark 才会根据 RDD 的依赖关系生成 DAG,并从起点开始真正的计算。

常见的算子如下图所示,主要也分为如下几种:
- Transformations 转换算子
- Actions 行动算子

3.1. Transformations转换算子
Transformations类算子是一类算子(本质就是函数)叫做转换算子,如map,flatMap,reduceByKey等。Transformations算子是延迟执行,也叫懒加载执行。
3.1.2. 常见Transformation类算子
filter:过滤符合条件的记录数,true保留,false过滤掉。map:将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。特点:输入一条,输出一条数据。flatMap:先map后flat。与map类似,每个输入项可以映射为0到多个输出项。sample 随机抽样算子,根据传进去的小数按比例进行有放回或者无放回的抽样。reduceByKey 将相同的Key根据相应的逻辑进行处理。sortByKey/sortBy作用在K,V格式的RDD上,对key进行升序或者降序排序。
3.1.3. 补充部分算子(多文件与分区)
-
转换算组 join
leftOuterJoinrightOuterJoinfullOuterJoin- 这些
join都是作用在**K,V格式的RDD上。根据key值进行连接**,例如:(K,V)join(K,W)返回(K,(V,W)) - 注意:
join后的分区数与父RDD分区数多的那一个相同。 -
union
- 合并两个数据集。两个数据集的类型要一致。
- 返回新的
RDD的分区数是合并RDD分区数的总和。 -
intersection : 取两个数据集的交集。 -
subtract : 取两个数据集的差集。 -
mapPartitions
- mapPartition与
map类似,单位是每个partition上的数据。 -
distinct(map+reduceByKey+map) 对RDD内数据去重。 -
cogroup
- 当调用类型
(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>))。 -
mapPartitionsWithIndex :类似于mapPartitions,除此之外还会携带分区的索引值。 -
repartition :增加或减少分区。此算子会产生shuffle。 -
coalesce :减少分区
-
coalesce常用来减少分区,算子中第二个参数是减少分区的过程中是否产生shuffle。 -
true为产生shuffle,false不产生shuffle。默认是false。 -
如果coalesce设置的分区数比原来的RDD的分区数还多的话,第二个参数设置为false不会起作用(转换之后分区数大于之前),如果设置成true,效果和repartition一样。 -
repartition(numPartitions) = coalesce(numPartitions,true)
-
groupByKey
- 作用在
K,V格式的RDD上。根据Key进行分组。作用在(K,V),返回(K,Iterable <V>)。 -
zip
- 将两个
RDD中的元素(KV格式/非KV格式)变成一个KV格式的RDD,两个RDD的个数必须相同。 -
zipWithIndex
- 该函数将
RDD中的元素和这个元素在RDD中的索引号(从0开始)组合成(K,V)对。
3.2. Action行动算子
? Action类算子也是一类算子叫做行动算子,如foreach,collect,count等。Transformations类算子是延迟执行,Action类算子是触发执行。一个application应用程序中有几个Action类算子执行,就有几个job运行。
3.2.2. 常见Action类算子
-
count:返回数据集中的元素数。会在结果计算完成后回收到Driver端。 -
take(n):返回一个包含数据集前n个元素的集合。 -
first:效果等同于take(1),返回数据集中的第一个元素。 -
foreach:循环遍历数据集中的每个元素,运行相应的逻辑。 -
collect:将计算结果回收到Driver端。 -
foreachPartition :遍历的数据是每个partition的数据。 -
countByKey
- 作用到
K,V格式的RDD上,根据Key计数相同Key的数据集元素。 -
countByValue
- 根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。
-
reduce
3.3. 控制算子
将RDD持久化,持久化的单位是partition。
- 控制算子有三种,
cache,persist,checkpoint。 cache和persist都是懒执行的。必须有一个**action类算子触发执行**。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
3.3.1. cache
默认将RDD的数据持久化到内存中。cache是懒执行。
cache() = persist() = persist(StorageLevel.Memory_Only)
测试代码:
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("CacheTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("./NASA_access_log_Aug95");
long startTime = System.currentTimeMillis();
long count = lines.count();
long endTime = System.currentTimeMillis();
System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+ (endTime-startTime));
long countStartTime = System.currentTimeMillis();
long countrResult = lines.count();
long countEndTime = System.currentTimeMillis();
System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime - countStartTime));
jsc.stop();
3.3.2. persist
可以指定持久化的级别。最常用的是MEMORY_ONLY和MEMORY_AND_DISK。
持久化级别如下:

上面这些带有_2的表示有副本replication。
cache和persist的注意事项:
persist懒执行,必须有一个action类算子触发执行。cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition(RDD的组成)。cache和persist算子后不能立即紧跟action算子。
3.3.3. checkpoint
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系,也是懒执行。
执行原理:
- 当
RDD的job执行完毕后,会从final RDD从后往前回溯。 - 当回溯到某一个
RDD调用了checkpoint方法,会对当前的RDD做一个标记。 Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
? 使用checkpoint时常用优化手段:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
demo示例如下:
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("checkpoint");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setCheckpointDir("./checkpoint");
JavaRDD<String> lines = sc.textFile("./NASA_access_log_Aug95");
lines.checkpoint();
lines.count();
jsc.stop();
4、Spark集群搭建
Spark集群搭建 :https://blog.csdn.net/weixin_43660536/article/details/119522431 .
5、任务提交方式
5.1. Standalone-client

5.1.2. 执行流程:
-
client模式提交任务后,会在客户端启动Driver进程。 -
Driver会向Master申请启动Application启动的资源。 -
资源申请成功,Driver端将task分发到worker端执行,启动executor进程(任务的分发)。 -
Worker端(exectuor进程)将task执行结果返回到Driver端(任务结果的回收)。
总结:
client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。- 生产环境下不能使用
client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。
5.2. Standalone-cluster提交任务方式

5.2.2. 执行流程:
cluster模式提交应用程序后,会向Master请求启动Driver。Master接受请求,随机在集群一台节点启动Driver进程。Driver启动后为当前的应用程序申请资源。Driver端发送task到worker节点上执行(任务的分发)。worker上的executor进程将执行情况和执行结果返回给Driver端(任务结果的回收)。
注意:
Driver进程是在集群某一台Worker上启动的,在提交applicaition的客户端是无法查看task的执行情况的。
5.3. Standalone总结
5.4. yarn-client
提交命令,如下三种方式皆可进行任务提交。
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
./spark-submit --master yarn–client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
执行原理图解:

5.4.2. 执行流程:
-
客户端提交一个Application,在客户端启动一个Driver进程。 -
应用程序启动后会向RS(ResourceManager)(相当于standalone模式下的master进程)发送请求,启动AM(ApplicationMaster)。 -
RS收到请求,随机选择一台NM(NodeManager)启动AM。这里的NM相当于Standalone中的Worker进程。 -
AM启动后,会向RS请求一批container资源,用于启动Executor。 -
RS会找到一批NM(包含container)返回给AM,用于启动Executor。 -
AM会向NM发送命令启动Executor。 -
Executor启动后,会反向注册给Driver,Driver发送task到Executor,执行情况和结果返回给Driver端。
总结:
Yarn-client模式同样是适用于测试。原因同Standalone-cluster原因一样。
ApplicationMaster(executorLauncher)的在此模式中的作用:
-
为当前的Application申请资源 -
给NodeManager发送消息启动Executor。
注意:ApplicationMaster在此种模式下有launchExecutor和申请资源的功能,并没有作业调度的功能。
5.5. yarn-cluster提交任务方式
提交命令:
./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
或者
./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../examples/jars/spark-examples_2.12-2.4.6.jar 10
执行原理图解

5.5.2. 执行流程:
-
客户机提交Application应用程序,发送请求到RS(ResourceManager),请求启动AM(ApplicationMaster)。 -
RS收到请求后随机在一台NM(NodeManager)上启动AM(相当于Driver端)。 -
AM启动,AM发送请求到RS,请求一批container用于启动Excutor。 -
RS返回一批NM节点给AM。 -
AM连接到NM,发送请求到NM启动Excutor。 -
Excutor反向注册到AM所在的节点的Driver。Driver发送task到Excutor。
总结:
Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是不再是提交任务的客户端机器,而是多个NM节点中的一台,不会产生某一台机器网卡流量激增的现象,但同样也有缺点,任务提交后不能看到日志。只能通过yarn查看日志。
ApplicationMaster在此模式中的的作用:
-
为当前的Application申请资源 -
给NodeManger发送消息启动Executor。 -
任务调度。
停止集群任务命令:yarn application -kill applicationID
6、RDD之窄依赖和宽依赖
RDD之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。

6.1. 窄依赖
- ? 父
RDD和子RDD的partition之间的关系是一对一的。或者父RDD和子RDD的partition关系是多对一的。不会有shuffle的产生。
6.2. 宽依赖
父RDD与子RDD的partition之间的关系是一对多。会有**shuffle的产生**。
6.3. 宽窄依赖图理解


7、Stage
Spark任务会根据RDD之间的依赖关系,形成一个**DAG有向无环图**,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,- 划分
stage的依据就是**RDD之间的宽窄依赖**。遇到***宽依赖***就划分stage,每个stage包含一个或多个task任务。 - 然后将这些
task以taskSet的形式提交给TaskScheduler运行。 stage是由一组并行的task组成。
7.1. stage切割规则
-
切割规则:从后往前,遇到宽依赖就切割stage。 1.从后向前推理,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中; 2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的; 3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是 ShuffleMapTask; 4.代表当前Stage的算子一定是该Stage的最后一个计算步骤; -
由于spark中stage的划分是根据shuffle来划分的,而宽依赖必然有shuffle过程,因此可以说spark是根据宽窄依赖来划分stage的。

stage计算模式–pipeline
pipeline管道计算模式,pipeline只是一种计算思想、模式。

注意点:
1.数据在管道里落地?
2.Stage的task并行度是由stage的最后一个RDD的分区数来决定的 。
3.如何改变RDD的分区数?
- reduceByKey(XXX,3)
- GroupByKey(4)
- sc.textFile(path,numpartition)
使用算子时传递分区num参数就是分区partition的数量。
8、SparkShuffle
8.1. SparkShuffle概念
- RDD之间根据宽依赖来划分stage,产生shuffle。
- 在Spark Shuffle阶段中,共分为Shuffle Write阶段和Shuffle Read阶段。
Shuffle Write:Shuffle Map Task对Task产生的中间数据进行操作,再根据数据分区方式对中间数据进行分区。Shuffle Read:Shuffle Read Task会拉取Shuffle Write阶段中产生的并已经分好区的中间数据。

Spark中有两种Shuffle类型,HashShuffle和SortShuffle。
Spark1.2之前是HashShuffle,Spark1.2引入SortShuffle 。spark1.2-spark1.6之间是有HashShuffle和SortShuffle的。spark2.0就只有sortshuffle。
哪些spark算子会有shuffle?
-
- 去重,distinct
- 排序,groupByKey,reduceByKey等
- 重分区,repartition,coalesce
- 集合或者表操作,interection,join
8.2. HashShuffle
8.2.1. 普通机制

执行流程:
1)每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K。buffer起到数据缓存的作用。
2)每个buffer文件最后对应一个磁盘小文件。
3)reduce task来拉取对应的磁盘小文件。
总结:
1)map task的计算结果会根据下个分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去。ReduceTask会去Map端拉取相应的磁盘小文件。
2)产生的磁盘小文件的个数: M(map task的个数)*R(reduce task的个数)。
产生的磁盘小文件过多,会导致以下问题:
-
在Shuffle Write过程中会产生很多写磁盘小文件的对象。 -
在Shuffle Read过程中会产生很多读取磁盘小文件的对象。 -
在JVM堆内存中对象过多会造成频繁的gc,gc还无法解决运行所需要的内存的话,就会OOM。 -
在数据传输过程中会有频繁的网络通信,频繁的网络通信出现通信故障的可能性大大增加,一旦网络通信出现了故障会导致shuffle file cannot find 由于这个错误导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。
8.2.2. 合并机制

执行流程:
- 合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。
- 在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
总结
- 产生磁盘小文件的个数:
C(core的个数)*R(reduce的个数)。
8.3. SortShuffle
8.3.1. 普通机制

执行流程
-
map task 的计算结果会写入到一个内存数据结构里面,内存数据结构默认是5M。 -
在shuffle的时候会有一个定时器,不定期的去估算这个内存结构的大小,当内存结构中的数据超过5M时,比如现在内存结构中的数据为5.01M,那么他会申请5.01*2-5=5.02M内存给内存数据结构。 -
如果申请成功不会进行溢写,如果申请不成功,这时候会发生溢写磁盘。 -
在溢写之前内存结构中的数据会进行排序分区 -
然后开始溢写磁盘,写磁盘是以batch的形式去写(批量),一个batch是1万条数据。 -
map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件。 -
reduce task去map端拉取数据的时候,首先解析索引文件,根据索引文件再去拉取对应的数据。
总结
- 产生磁盘小文件的个数:
2*M(map task的个数)
8.3.2. bypass机制
bypass机制示意图

总结
8.4. Shuffle文件寻址
-
MapOutputTracker
MapOutputTracker是Spark架构中的一个模块,是一个主从架构。管理磁盘小文件的地址。MapOutputTrackerMaster是主对象,存在于Driver中。MapOutputTrackerWorker是从对象,存在于Excutor中。 -
BlockManager
-
BlockManager:块管理者,是Spark架构中的一个模块,也是一个主从架构。 -
BlockManagerMaster,主对象,存在于Driver中。 -
BlockManagerWorker,从对象,存在于Excutor中。
BlockManagerMaster会在集群中有用到广播变量和缓存数据或者删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据BlockManagerWorker,从对象,存在于Excutor中。BlockManagerWorker会与BlockManagerWorker之间通信。 -
无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有四个对象:
DiskStore:负责磁盘的管理。MemoryStore:负责内存的管理。ConnectionManager:负责连接其他的BlockManagerWorker。BlockTransferService:负责数据的传输。
Shuffle文件寻址图

-
Shuffle文件寻址流程 -
当map task执行完成后,会将task的执行情况和磁盘小文件的地址封装到MapStatus对象中,通过MapOutputTrackerWorker对象向Driver中的MapOutputTrackerMaster汇报。 -
在所有的map task执行完毕后,Driver中就掌握了所有的磁盘小文件的地址。 -
在reduce task执行之前,会通过Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster获取磁盘小文件的地址。 -
获取到磁盘小文件的地址后,会通过BlockManager中的ConnectionManager连接数据所在节点上的ConnectionManager,然后通过BlockTransferService进行数据的传输。 -
BlockTransferService默认启动5个task去节点拉取数据。默认情况下,5个task拉取数据量不能超过48M。
8.5. Shuffle调优
- 在代码中,不推荐使用,硬编码。
new SparkConf().set("spark.shuffle.file.buffer","64")
- 在提交spark任务的时候,推荐使用。
spark-submit --conf spark.shuffle.file.buffer=64 –-conf ….
- 在
conf下的spark-default.conf配置文件中,不推荐,因为是写死后所有应用程序都要用。
9、Spark RDD小结
9.1. Spark application概念小结

Application:基于Spark的用户程序,包含driver程序和运行在集群上的executor程序,即一个完整的spark应用 。
- 一个application包含多个Job(作业)。
Job :包含多个Stage(阶段),与action算子一 一对应。Stage:一个job会被拆分成很多组任务Stage(就像MapReduce分为MapTask和ReduceTask一样)。- Task:一个被发送到executor上的工作单元。一个stage由多个Task组成(由partition决定)。
- pipline:一个Pipeline对应一个Task。一个stage由多个pipeline组成(由task决定)。
Task:被发送到**executor上的工作单元**。- RDD:一个RDD包含了多个Partition
- partition:多个Partition是并行操作的(并行度)
9.2. Spark 代码 运行流程

10、Spark资源调度和任务调度

10.1. 调度流程
-
启动集群后,Worker节点会向Master节点汇报资源情况,Master掌握了集群资源情况。 -
当Spark提交一个Application后,根据RDD之间的依赖关系将Application形成一个DAG有向无环图。 -
任务提交后,Spark会在Driver端创建两个对象:DAGScheduler和TaskScheduler,DAGScheduler是任务调度的高层调度器,是一个对象。 -
DAGScheduler的主要作用就是将DAG根据RDD之间的宽窄依赖关系划分为一个个的Stage,然后将这些Stage以TaskSet的形式提交给TaskScheduler(TaskScheduler是任务调度的低层调度器,这里TaskSet其实就是一个集合,里面封装的就是一个个的task任务,也就是stage中的并行的task任务)。 -
TaskSchedule会遍历TaskSet集合,拿到每个task后会将task发送到Executor中去执行(其实就是发送到Executor中的线程池ThreadPool去执行)。 -
task在Executor线程池中的运行情况会向TaskScheduler反馈,当task执行失败时,则由TaskScheduler负责重试,将task重新发送给Executor去执行,默认重试3次。如果重试3次依然失败,那么这个task所在的stage就失败了。 -
stage失败了则由DAGScheduler来负责重试,重新发送TaskSet到TaskScheduler,Stage默认重试4次。如果重试4次以后依然失败,那么这个job就失败了。job失败了,Application就失败了。 -
TaskScheduler不仅能重试失败的task,还会重试straggling(落后,缓慢)task(也就是执行速度比其他task慢太多的task)。如果有运行缓慢的task那么TaskScheduler会启动一个新的task来与这个运行缓慢的task执行相同的处理逻辑。两个task哪个先执行完,就以哪个task的执行结果为准。这就是Spark的推测执行机制。在Spark中推测执行默认是关闭的。推测执行可以通过spark.speculation属性来配置。 -
注意: -
-
对于ETL类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。 -
如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有task重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。
10.2. 流程图解

10.3. 粗细粒度资源申请
10.3.1. 粗粒度资源申请(Spark)
-
在Application执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的task执行完成后,才会释放这部分资源。 -
优点:在Application执行之前,所有的资源都申请完毕,每一个task直接使用资源就可以了,不需要task在执行前自己去申请资源,task启动就快了,task执行快了,stage执行就快了,job就快了,application执行就快了。 -
缺点:直到最后一个task执行完成才会释放资源,集群的资源无法充分利用。
10.3.2. 细粒度资源申请(MR)
Application执行之前不需要先去申请资源,而是直接执行,让**job中的每一个task在执行前自己去申请资源,task执行完成就释放资源**。- 优点:集群的资源可以充分利用。
- 缺点:
task自己去申请资源,task启动变慢,Application的运行就响应的变慢了。
11、案例解答
spark案列解答.https://blog.csdn.net/weixin_43660536/article/details/119610731
12、广播变量和累加器
12.1. 广播变量

-
sparkScontext.broadcast() 方法用来定义广播变量 -
广播变量只能在Driver端定义与修改,不能在Executor端定义与修改。 -
会将广播变量封装发送到Executor的BlockManage中。 -
同个Executor的Task共享BlockManage的数据。
val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello yjx")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach {println}
sc.stop()
12.2. 累加器

val accumulator = sparkContext.longAccumulator定义累加器- 累加器在
Driver端定义赋初始值,累加器只能在Driver端读取,在Excutor端更新。
val conf = new SparkConf()
conf.setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.longAccumulator
sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
println(accumulator.value)
sc.stop()
13、spark核心源码解析
spark核心源码解析.https://blog.csdn.net/weixin_43660536/article/details/119610936
14、Spark内存管理
Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。
Spark内存管理分为静态内存管理和统一内存管理,Spark1.6之前(不包含)使用的是静态内存管理,Spark1.6之后引入了统一内存管理。
静态内存管理中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。
统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。
Spark1.6及1.6版本之后的版本默认使用的是统一内存管理。
要想使用静态内存可以通过参数spark.memory.useLegacyMode
? 设置为true(默认为false)使用静态内存管理。
静态内存管理分布图

统一内存管理分布图

reduce 中OOM如何处理?
-
减少每次拉取的数据量。 -
提高shuffle聚合的内存比例。 -
提高Excutor的总内存。
|