shuffle
为什么要有shuffle
shuffle:为了让相同的key进入同一个reduce 每一个key对应的value不一定都在同一个分区中,也未必都在同一个节点上,而是极可能分布在各个节点上
shuffle分类
Shuffle Write
发生在map端的shuffle,需要将数据进行分组排序,将数据写出去
Shuffle Read
发生在reduce端的shuffle,将数据读进来
shuffle可能会面临的问题
为了让不同分区的文件进入同一个reduce(这里有三个reduce),这里会形成map的数量*reduce数量的文件,reduce的数量由分区数决定,分区数量由切片数量决定,切片数量差不多是128MB一个切片 假如我们这里有10G的文件,128MB切一个文件,差不多就是80个文件,那这里就有80*80=6400个文件,小文件非常多,会很影响效率
HashShuffle优化解决问题
每个Executor都带1核,每次最多只能执行一个task,这里面的task依次运行,每次只能生成一个文件,每一个Executor最终生成和reduce个数对应的文件 这里的文件就是核数reduce个数,而凡是shuffle类的算子都可以指定分区
reduce分区数决定因素
val conf: SparkConf = new SparkConf()
.setMaster("local")
.setAppName("Demo10Broadcast")
//通过conf设置spark默认的并行度
conf.set("spark.default.parallelism","4")
val sc: SparkContext = new SparkContext(conf)
//在集群中运行的时候,默认最少是两个分区
val stuRDD: RDD[String] = sc.textFile("D:\\BigDaTa\\JAVA_Project\\ShuJia01\\data\\students.txt",2)
println(stuRDD.getNumPartitions) //2个分区
//如果没有产生shuffle,那么子RDD的分区数由父RDD的分区数决定
val wordsRDD: RDD[String] = stuRDD.flatMap(_.split(","))
println(wordsRDD.getNumPartitions)
val mapRDD: RDD[(String, Int)] = wordsRDD.map(word=>(word,1))
println(mapRDD.getNumPartitions)
val repRDD: RDD[(String, Int)] = mapRDD.repartition(4)
println(repRDD.getNumPartitions)
//shuffle类的算子可以手动指定分区数
//相当于手动设置reduce个数
val wordCntRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_,3) //这里如果不指定为3的话,就会使用默认的并行度4
println(wordCntRDD.getNumPartitions)
/**
* shuffle类算子产生的RDD的分区数决定因素
* 1、如果没有指定,默认和父RDD分区数一样
* 2、可以手动指定修改分区数
* 3、通过默认参数设置
*
* 手动设置 > 默认设置 > 父RDD的分区数
*/
}
SortShuffle
spark使用的shuffle机制
- 普通运行机制
- bypass运行机制
相比较普通运行,少了排序,reduce个数小于200,默认这种方式
这里的内存数据初始化时5MB,不够用了就*2 索引文件可以区分不同的分区,在找文件的时候可以更快一些 这里最后产生的文件的数量就是 2×分区 的数量
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
|