三, MapReduce框架原理

MapReduce分为两个阶段, 一个是Map阶段,我们统称为MapTask; 另一个是Reduce阶段, 我们统称为ReduceTask;
- 在MapTask中, 根据Input数据源, 通过InputFormat对数据进行处理(1.对输入数据切片 2. 实现以某种方式读取切片数据)), 读取的数据在Mapper(用户实现的逻辑)中进行处理.
- 在Mapper之后, Reducer之前, 进行Shuffle(混洗), 排序, 分区, 合并
- 在ReduceTask中,
[问题1]: 什么是InputFormat ? 其作用是什么?
-
平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(CombineTextInputFormat.class);来保证输入文件按照我们想要的格式被读取。所有的输入格式都继承于InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的FileInputFormat,用来读取数据库的DBInputFormat等等。 -
其实,一个输入格式InputFormat,主要无非就是要解决如何将数据切片[比如多少数据/文件为一个切片] ,以及如何读取切片的数据[比如按行读取], 生成相应的K-V供Mapper读取 。前者由getSplits()完成,后者由RecordReader完成。
[问题2]: 什么是OutputFormat ? 其作用是什么?
3.1 InputFormat 数据输入
3.1.0 切片与MapTask并行度决定机制
-
问题引入: MapTask的并行度决定Map阶段的任务处理并发度, 进而影响到整个Job的处理速度. Q: 1G的数据, 启动8个MapTask, 可以提高集群的并发处理能力. 那么1K的数据也启动8个MpaTask, 会提高集群性能吗? MapTask并发任务是否越多越好呢? 哪些因素影响了MapTask并行度? -
MapTask并行度决定机制
- 数据块: Block是HDFS物理上把数据分为一块一块. 数据块是HDFS的物理存储数据单元.
- 数据切片: 数据切片只是在
逻辑上 对输入进行分片, 并不会在磁盘上将其切成片进行存储. 数据切片是Mapreduce程序计算输入数据的单位, 一个切片会对应启动一个MapTask.

3.1.1 Job提交流程源码和切片源码详解
在深入了解InputFormat接口的作用及它的各种子类之间的区别之前, 我们先理解一下Job提交和数据切片的过程(这里先拿FileInputFormat 的子类TextInputFormat的切片为例):
详见此文: MapReduce - Job提交和切片流程源码详解
3.1.2 FileInputFormat 切片机制

3.1.2.1 FileInputFormat类的切片过程
[切片过程] 
3.1.2.2 FileInputFormat 切片大小的参数配置
- 源码中计算切片大小的代码和配置项
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
mapreduce.input.fileInputformat.split.minsize=1, 默认值为1
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue, 默认值为 Long.MAXValue.
- 因此, 默认情况下, 切片大小=blockSize;
- 切片大小的设置
-
maxSize(切片最大值)–使切片变小:
- maxSize调的比blockSize小, 则会让切片变小, 而且实际切片大小就等于maxSize
-
minSize(切片最小值)–使切片变大
- minSize调的比BlockSize(此时blockSize > maxSize)大, 则可以让实际切片大小=minSize
- 获取切片信息的API
String name = inputSplit.getPath().getName();
FileSplit inputSplit = (FileSPlit) context.getInputSplit();
3.1.5 结构梳理: InputFormat 抽象类和它的各种子类
[InputFormat 子类]
-
在运行 MapReduce 程序时,输入的文件格式包括:基于行的日志文件、二进制格式文件、数据库表等。为了处理这些文件, Hadoop设计了InputFormat抽象类, 前面我们提过一嘴, 这个抽象类的作用是对输入的数据进行切片getSplits()并规定如何读取输入数据createRecordReader() , 由于输入文件类型的不同, 比如处理数据库表的DBInputFormat, 处理普通文本文件的FileInputFormat, 切片方式的不同, 读取切片数据的方式不同, InputFormat抽象类有着各种各样的子类. 我们重点应掌握的是InputFormat抽象子类, FileInputFormat及其子类TextInputFormat , 孙子类CombineTextInputFormat -
Input接口及其子类结构如下图所示: -
我们重点关注框图内的类; 
tips: IDEA下,
- ctrl+h 查看接口/抽象类的类结构
- ctrl+F12 查看类中实现的方法
[框内常用类及方法的类图] (待补充)
[类图中涉及到的类的完整变量, 方法]
-
InputFormat (抽象类, 抽象方法) -
FileInputFormat (抽象类, 定义了切片方式(即对普通文件进行默认切片(单文件单独切片)), 未定义如何读取切片)  -
TextInputFormat (定义了读取切片的方式K,距首行偏移量, V-一行)  -
CombineFileInputFormat (抽象类, 定义了切片方式(多个文件切片等))  -
CombineTextInputFormat (定义了读取切片的方式) 
[FIleInputFormat 实现类]
- FileInputFormat 常见的子类包括:TextInputFormat、ConbineFileInputFormat(抽象类), KeyValueTextInputFormat、NLineInputFormat、CoTextInputFormat 和自定义 InputFormat 等。
- 其中需要熟练掌握
TextInputFormat 和 CombineFileInputFormat(抽象类)中的子类CombineTextInputFormat .
3.1.5.1 TextInputFormat
特点:
- TextInputFormat 是默认的FIleInputFormat实现类.
- 定义了读取切片的方式, 按行读取每条记录.
- 返回值, <KEYIN, VALUEIN>=<LongWritable, Text>, 键是存储在该行在整个文件中的起始字符偏移量, Longwritable类型; 值是这一行的内容, 不包括任何行终止符号(换行, 回车符号), Text类型.

举个栗子: 
实际应用: 经典WordCount案例实操
3.1.5.2 CombineTextInputFormat
- Mapreduce中默认的TextInputFormat切片机制是对任务按文件规划切片, 不管文件多小, 都会是单独进行切片, 由于一个切片对应于一个MapTask. 所以, 如果有大量的小文件, 就会产生大量的MapTask, 处理效率及其低下.
-
应用场景 CombineTextInputFormat 用于小文件过多的场景, 它可以将多个小文件从逻辑上规划到一个切片中, 这样, 多个小文件就可以交给一个MapTask处理 -
虚拟存储切片最大值设置 CombineTextInputFormat.setMaxInputSplkitSize(job, 4194304);//4MB
注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值.
- CombineTextInputFormat 切片机制
生成切片的过程: 分为虚拟存储过程 和切片过程 .
[虚拟存储过程]
将输入目录下所有文件大小, 依次和设置的 setMaxInputSplitSize值比较:
- 如果
不大于设置的最大值 , 逻辑上划分为一个块, - 如果
大于设置的最大值且大于两倍 , 以设置最大值切割出一块 - 当剩余数据大小
大于设置的最大值却不大于两倍 , 将剩余文件均分为2个虚拟存储块(防止出现太小切片)
举个栗子: setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M;
- 8.02MB > 设置最大值的两倍, 切割出 4MB一块;
- 剩余 4.02MB, 大于设置最大值却不大于两倍, 均分即可, 切割出2.01MB, 2.01MB 两块.
- 所以8.02MB总共分成了 4MB, 2.01MB, 2.01MB 三个虚拟存储块
[切片过程]
- 判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片。
- 如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
举个栗子: 
再举个栗子:

3.1.5.2.1 CombineInputFormat案例实操
[准备工作]
- 准备四个普通文本文件放到输入目录, 大小如图所示

[代码编写]
前面我们学习了TextInputFormat 的wordCount案例: 对于专门用于处理文本文件的 FileInputFormat接口, 我们在编写WordCount时都是默认实现其中的TextInputFormat类(即按文件单独切片)
- 但是, 由于大量小文件单独切片会造成集群中开启大量的MapTask, 从而使集群资源占用高甚至集群瘫痪, 所以我们引入了
CombineTextInputFormat , 它能够把小文件集中起来按照设置的切片最大值(setMaxInputSplitSize)进行切片.
而且, 实现CombineTextInputFormat只需要在原有的WordCount基础上对Driver类添加下面的语句即可
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
[与TextInputFormat的比较]
实现TextInputFormat类的WordCount切片数目:
- 在控制台比较靠前的位置, 我们可以查看
number of splits: 4 , 即 前面的栗子, a,b,c,d 4个文件分成了4个切片. 
实现CombineTextInputFormat类的WordCount切片数目:
- 删除输出目录, 把前面介绍的两句代码添加到Driver类中, 重新执行WordCount程序可以发现切片变成了3片,
- 为什么是3片呢? 再来回顾一下:
- 已知输入的文件大小:1.7, 5.2, 3.4, 6.9, 最大块大小 4MB
- 虚拟存储块:
1.7, 2.6, 2.6(因为5.2>4, 但是小于4的2倍嘛), 3.4, 3.45, 3.45 - 切片:
4.3(1.7+2.6), 6, 6.9MB , 所以是3片 
[拓展]
- 如果我们想把这四个文件都放到一个切片来处理, 应该如何设置呢?
- 简单, 把
setMaxSplitSize(job, size) 的size值设置的比所有输入文件总大小还大就可以了.
3.2 MapReduce 工作流程
MapReduce 详细工作流程(一) 
- 输入文件
- Q: 切片?
- 客户端提交(jar包, 切片规划文件, xml配置文件)到Yarn RM
- Yarn根据切片计算出MapTask的数量
- MapTask启动, 开始处理分到的切片文件. 使用InputFormat接口(默认是实现类 TextInputFormat)处理文件, 主要实现
- MapTask读取输入数据, 交给Mapper
- Mapper中是用户自己实现的业务逻辑, 对输入的k-v进行处理
MapReduce 详细工作流程(二)
Shffle 过程详解:
- MapTask收集我们的map()方法输出的k-v对, 放到内存缓冲区;
- 从内存缓冲区不断的移除本地磁盘文件, 可能会溢出多个文件;
- 多个溢出文件会被合并成大的溢出文件
- 在溢出过程及合并的过程中, 都要调用Partitioner进行分区和针对key进行排序
- ReduceTask根据自己的的分区号, 去各个MapTask机器上取响应的结果分区数据;
1.
3.3 Shuffle机制
- Map方法之后, Reduce方法之前的数据处理过程叫做Shuffle(混洗);

- map后进入一个getPatition(), 标记文件位于哪个分区
- 进入环形缓冲区(默认100MB), 左侧存索引, 右侧存数据.到达80%的时候反向溢写(留出20%的溢写时间), 注意在溢写之前, 要根据Key的索引对数据进行排序(快排), 按照字典顺序排序.
- 排序之后, 开始溢写, 溢写产生的文件是spill.index 和 spill.out
- Combiner为可选流程, 可以规整重复的数据, 提高传输效率
3.4 OutputFormat 数据输出
3.5 MapReduce 内核源码解析
3.6 Join 应用
3.7 数据清洗(ETL)
3.8 MapReduce开发总结
四, Hadoop数据压缩
五, 常见错误和解决方案
|