IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spark Shuffle和Mapreduce Shuffle -> 正文阅读

[大数据]Spark Shuffle和Mapreduce Shuffle

Spark Shuffle和Mapreduce Shuffle的区别

MR Shuffle

MR shuffle
Spark Shuffle中包括Hash Shuffle(优化和未优化)、sortShuffle、BypassMergeSortShuffle

MR Shuffle包括Map Shuffle和Reduce Shuffle

//MR Shuffle
MapShuffleMap方法之后开始:环形缓冲区刷写、分区排序(分区标记在进入环形缓冲区之前已经打上、排序在刷写之前发生)、combine预聚合、归并排序、压缩

ReduceShuffleReducer复制数据开始:copy(ReducerMapper端拉取数据)、sort(对数据再做合并和归并排序,分组后进入reduce方法)、reduce(reduce用户处理逻辑)
    
//Spark Shuffle
主要分为write和read两个部分:
    
RDD中每一个分区就对应一个shuffleMapTask;
每一个shuffleMapTask会为每一个resultTask生成一个bucket;(优化后可共用,通过索引文件的方式来区分不同MapTask对应的数据)    

Spark Shuffle

未优化的HashShuffle:

1、上图有2个CPU,可以同时运行两个ShuffleMapTask

2、每个task将写buket缓冲区,缓冲区的数量和reduce任务的数量相等

3、 每个buket缓冲区会生成一个对应ShuffleBlockFile

4、ShuffleMapTask 如何决定数据被写到哪个缓冲区呢?这个就是跟partition算法有关系,这个分区算法可以是hash的,也可以是range的 ;

5、最终产生的ShuffleBlockFile会有多少呢?就是ShuffleMapTask 数量乘以reduce的数量,这个是非常巨大的

在这里插入图片描述

优化后的HashShuffle:(consolidation机制)(

在同一核CPU执行先后执行的ShuffleMapTask可以共用bucket缓冲区,然后写到同一份ShuffleFile里去,上图所示的ShuffleFile实际上是用多个ShuffleBlock构成,那么,那么每个worker最终生成的文件数量,变成了cpu核数乘以reduce任务的数量,大大缩减了文件量。

优点:减缓了小文件过多的问题;
在这里插入图片描述

sortShuffle:

通过排序建立索引,相比较于hashShuffle,它只有一个临时文件,不管有多少个reduceTask都只有一个临时文件;

在该模式下,数据会先写入一个数据结构,聚合算子写入Map,一边通过Map局部聚合,一遍写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。

在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个task过程会产生多个临时文件。

最后在每个task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个task的数据在文件中的索引,start offset和end offset。
这样算来如果第一个stage 50个task,每个Executor执行一个task,那么无论下游有几个task,就需要50个磁盘文件。(磁盘文件数只与MapTask数目有关、一个MapTask对应一个磁盘文件)

在这里插入图片描述

BypassMergeSortShuffle:

//bypass机制运行条件:
shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
不是聚合类的shuffle算子(比如reduceByKey)。
    
在这种机制下,当前stage的task会为每个下游的task都创建临时磁盘文件。将数据按照key值进行hash,然后根据hash值,将key写入对应的磁盘文件中(个人觉得这也相当于一次另类的排序,将相同的key放在一起了)。最终,同样会将所有临时文件依次合并成一个磁盘文件,建立索引。
该机制与未优化的hashshuffle相比,没有那么多磁盘文件,下游task的read操作相对性能会更好。
该机制与sortshuffle的普通机制相比,在readtask不多的情况下,首先写的机制是不同,其次不会进行排序。这样就可以节约一部分性能开销。
// 写机制不同,没有排序过程;
//另外这个Sort-Based Shuffle跟Executor核数没有关系,即跟并发度没有关系,它是每一个ShuffleMapTask都会产生一个data文件和index文件,所谓合并也只是将该ShuffleMapTask的各个partition对应的分区文件合并到data文件而已。所以这个就需要个Hash-BasedShuffle的consolidation机制区别开来。(consolidation机制最后文件数是和CPU数和ResultTask数目有关)

在这里插入图片描述

Shuffle Read:

Shuffle读由reduce这边发起,它需要先到临时文件中读,一般这个临时文件和reduce不在一台节点上,它需要跨网络去读。但也不排除在一台服务器。不论如何它需要知道临时文件的位置,
      这个是谁来告诉它的呢?它有一个BlockManager的类。这里就知道将来是从本地文件中读取,还是需要从远程服务器上读取。
       读进来后再做join或者combine的运算。
       这些临时文件的位置就记录在Map结构中。
       可以这样理解分区partition是RDD存储数据的地方,实际是个逻辑单位,真正要取数据时,它就调用BlockManage去读,它是以数据块的方式来读。
       比如一次读取32k还是64k。它不是一条一条读,一条一条读肯定性能低。它读时首先是看本地还是远程,如果是本地就直接读这个文件了,
       如果是远程,它就是发起一次socket请求,创建一个socket链接。然后发起一次远程调用,告诉远程的读取程序,读取哪些数据。读到的内容再通过socket传过来。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-03 17:23:07  更:2021-08-03 17:23:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/21 19:38:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码