一、简介
MapReduce是Hadoop的核心组件之一,用于大数据的并行 处理的计算模型、框架和平台,主要解决海量数据 的计算,是目前分布式计算模型应用较为广泛的一种;
1、核心思想
采用分而治之 的方法,将复杂问题分解成各个小问题,在逐个解决;且最后再将各个小问题的解进行整合即整个问题的解;
2、处理的主要事务
2.1 Map
主要负责将任务分解 ,把复杂的任务划分为若干个 简单的任务(此类任务之间无必然联系,可单独执行)来并行处理 ;
- map函数的接收数据格式是键值对,产生的输出也是键值对的形式;
- map中产生的数据有可能不经过reduce,直接被写入HDFS;
2.2 Reduce
负责将任务合并,将Map阶段的结果进行全局汇总;
- 以map返回的键值对作为输入,将相同的key进行汇总输出新的键值对;
- 在任务较多的情况下,阔以使用多个reduce进行处理;
3、工作原理
3.1 主要流程
- 分片、格式化数据源;
- 执行MapTask;
- 执行Shuffle过程;
- 执行ReduceTask;
- 写入文件;
3.2 分片、格式化数据
输入的数据源必须经过此步骤;
分片:
将源文件划分为大小相等的小数据块(2.x下默认128M),Hadoop会为每一个分片构建一个Map任务,并由该任务运行自定义的map函数,从而处理分片里面的每一条记录;
格式化:
将划分好的分片格式化为键值对的数据,key为偏移量,value为每一行的内容;
3.3 执行MapTask
每个Map任务都有一个内存缓冲区 〈缓冲区大小100M),输入的分片数据经过Map任务处理后的中间结果,会写入内存缓冲区中。如果写入的数据达到内存缓冲的阀值(80M) ,会启动一个线程将内存中的溢出数据写入磁盘 ,同时不影响map中间结果继续写入缓冲区。在溢写过程中,MapReduce框架会对Key进行排序 ,如果中间结果比较大,会形成多个溢写文件 ,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件,如果多个,则合并为一个文件。
需要经历5个阶段:Read、Map、Collect、Spill、Combiner ;
- Read:
MapTask 通过用户编写的RecordReader ,从输入的InputSplit 中解析出一个个key/value ; - Map:将解析出的
key/value 交给用户编写的map()函数 处理,并产生一系列新的key/value ; - Collect:在用户编写的
map()函数 中,数据处理完成后,一般会调用outputCollector.collect() 输出结果,在该函数内部,它会将生成的 key/value 分片(通过调用partitioner ),并写入一个环形内存缓冲区 中; - Spill:即
溢写 ,当环形缓冲区满 后,MapReduce会将数据写到本地磁盘 上,生成一个临时 文件。需要注意的是,将数据写入本地磁盘前,先要对数据进行一次本地排序 ,并在必要时对数据进行合并 、压缩 等操作。 Combine:当所有数据处理完成以后,MapTask会对所有临时文件 进行一次合并 ,以确保最终只会生成一个数据文件 。
3.4 执行Shuffle
是map阶段处理的数据如何传递给Reduce阶段,Shufflc会将MapTask输出的处理结果数据,分发给ReduceTask ,并在分发的过程中,对数据按key进行分区 和排序 ;
map
- MapTask处理的结果会暂且放入一个
内存缓冲区 中(默认100M ),当缓冲区快要溢出 时(默认达到80% ),会在本地文件系统创建一个溢出 文件,将该缓冲区的数据写入这个文件; - 写入
磁盘 之前,线程会根据reduscTask 的数量,将数据分区,一个Reduce任务对应一个分区的数据。这样做的目的是为了避兔 有些reduce任务分配到大量数据 ,而有些reduce任务分到很少的数据,甚至没有分到数据; - 分完数据后,会对每个
分区 的数据进行排序 ,如果此时设置了Combiner ,将排序后的结果进行combiner操作,这样做的目的是尽可能少 的执行数据写入磁盘的操作; - 当map任务输出
最后一个记录 时,可能有很多溢出文件 ,这事需要将这些文件合并 ,合并的过程中会不断的进行排序和 combine操作,其目的有两个:一是尽量减少每次写入磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件; - 将分区中的数据
拷贝 给对应的reduce 任务;
reduce
- Reduce会接收到
不同map 任务传来的数据,并且每个map传来的数据都是有序 的。如果reduce阶段接收的数据量相当小 ,则直接存储在内存 中,如果数据量超过了该级冲区大小的一定比例 ,则对数据合并后溢写到磁盘 中; - 随着溢写文件的
增多 ,后台线程会将它们合并 成一个更大的有序的文件,这样做是为了给后面的合并节省时间 ; - 合并的过程中会产生许多的
中间文件 ,但MapReduce会让写入磁盘的数据尽可能地少 ,并且最后一次合并的结果并没有写入磁盘 ,而是直接输入到reduceh函数;
3.5 执行ReduceTask
输入ReduceTask的数据流是<key,{value list}> 形式,用户可以自定义reduce() 方法进行逻辑处理,最终以<key,value> 的形式输出;
需要经历5个阶段:Copy 、Merge 、Sort 、Reduce 、Write ;
- Copy:Reduce会从各个
MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阀值 ,则写到磁盘 上,否则直接放到内存 中; - Merge:在
远程拷贝数据 的同时,ReduceTask会启动两个后台线程 ,分别对内存 和磁盘 上的文件进行合并 ,以防止内存使用过多或者磁盘文件过多; - Sort:自定义
reduce() 方法输入数据是按key 进行聚集的一组数据。为了将key相同 的数据聚在一起,Hadoop采用了基于排序 的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序 ,因此,ReduceTask只需对所有数据进行一次归并排序 即可; - Reduce:对排序后的键值对调用
reduce() 方法,键相等的键值对调用一次reduce()方法,每次调用会产生零个或者多个键值对 ,最后把这些输出的键值对写入到HDFS系统 中; - Write:reduce()函数将计算
结果写到HDFS 上;
3.6 写入文件
MapReduce框架会自动把ReduccTask 生成的<key,value> 传入OutputFormat的write 方法,实现文件的写入操作;
4、编程组件
4.1 InputFormat
描述输入数据的格式,主要提供两个功能;
4.2 OutputFormat
用于描述MapReduce程序输出格式和规范的抽象类;
4.3 Combiner
对Map阶段的输出的重复数据先做一次合并 计算,然后把新<key, value> 作为Reduce阶段的输入;
4.4 Mapper
Mapper类是实现Map任务的一个抽象基类,该基类提供一个map()方法;
4.5 Reducer
Map过程输出的键值对,将由Reducer组件进行合并处理,最终的某种形式的结果输出;
4.6 Partitioner
让Map对key进行分区,从而阔以根据不同给的key分发到不同的Reduce去处理,其目的就是将key 均匀分布在ReduceTask ;
5、运行模式
5.1 本地运行模式
在当前的开发环境模拟MapReduce执行环境,处理的数据及结果在本地操作系统;
5.2 集群运行模式
把MapReduce打包成Jar包,提交到Yarn集群上运行任务,由于Yarn集群负责资源管理和任务调度,程序会被框架分发带集群中的节点上并发的执行,因此处理的数据和输出结果到HDFS文件系统中;
6、性能优化
一般从5个方面优化,数据输入、Map阶段、Reduce阶段、Shuffle阶段和其他调优属性;
6.1 数据输入
在执行MapReduce任务前,将小文件进行合并,大量的小文件会产生大量的map任务 ,增大map任务装载的次数 ,而任务的装载比较耗时 ,从而导致MapReduce运行速度较慢。因此我们采用CombineTextInputFormat 来作为输入,解决输入端大量 的小文件场景;
6.3 Map阶段
- 减少溢写次数:通过调整
io.sort.mb 及sort.spill.percent 参数值,增大触发spill的内存上限 ,减少spill次数 ,从而减少磁盘IO 。 - 减少合并次数:通过调整
io.sort.factor 参数,增大merge 的文件数目,减少merge的次数,从而缩短mr处理时间。 - 在 map之后,不影响业务逻辑前提下,先进行
combine 处理,减少IO 。我们在上面提到的那些属性参数,都是位于mapred-default.xml 文件中,这些属性参数的调优方式。
属性名称 | 类型 | 默认值 | 说明 |
---|
mapreduce.task.io.sort.mb | int | 100 | 配置排序map输出时使用的内存缓冲区的大小,默认为100Mb | mapreduce.map.sort.spill.percent | float | 0.80 | map输出内存缓冲和用来开始磁盘溢出写过程的记录边界索引的阈值,即最大使用环形缓冲内存的阈值,一般默认0.8 | mapreduce.task.io.sort.factor | int | 10 | 排序文件,一次最多合并的流数,实际开发中可将这个值设置为100 | mapreduce.task.min.num.spills.for.combine | int | 3 | 运行combiner时,所需的最少溢出文件数 |
6.4 Reduce
- 合理设置map和 reduce数:两个都不能设置太少,也不能设置太多。太少,会导致
task等待 ,延长处理时间;太多,会导致 map、reduce任务间竞争资源 ,造成处理超时等错误; - 设置map、reduce共存:调整
slowstart.completedmaps 参数,使 map运行到一定程度后,reduce也开始运行,减少reduce 的等待时间; - 规避使用reduce:因为reduce在用于连接数据集的时候将会产生大量的
网络消耗 。通过将MapReduce参数 setNumReduceTasks 设置为0来创建一个只有map的作业; - 合理设置reducc端的 buffer:默认情况下,数据达到一个
阈值 的时候,buffer中的数据就会写入磁盘 ,然后reducc会从磁盘中获得所有的数据。也就是说,buffer和 reduce是没有直接关联的,中间多一个写磁盘->读磁盘的过程,既然有这个弊踏,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销。这样一来,设置 buffer需要内存,读取数据需要内存,reduce 计算也要内存,所以要根据作业的运行情况进行调整。
6.5 Shuffle
Shuffle阶段的调优就是给Shufflc尽量多地提供内存空间 ,以防止出现内存溢出 现象,可以由参数`mapred.child.java.opts 来设置,任务节点上的内存大小应尽量大。
|