1.MapReduce数据流程概览
MapReduce计算框架整体分为四大部分:
- source端就是InputFormat;
- 中间分布式计算就是Mapper和Reducer;
- sink就是OutputFormat
2. InputFormat
InputFormat处理数据的第一步;将原始数据处理成KV形式交给Mapper处理; InputFormat是一个接口,拥有各种实现类应用于不同的场景中,将在后面一一介绍; InputFormat的主要功能就是对输入数据文件进行切片 和 处理成KV
2.1 切片与MapTask并行度决定机制
(1)数据块和数据切片的概念
数据块:Block是HDFS物理上把文件分成一块一块。数据块是HDFS存储数据单位。 数据切片:数据切片只是对一个数据文件在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位。一个切片会对应启动一个MapTask。
(2)切片和maptask的关系
maptask是并行度,并行度越高,处理越快;一个MapReduce可能处理多个文件,而所有文件的总切片个数决定了启动多少个maptask; 切片大小一般和块大小保持一致;
(3)切片大小的设定
Yarn优化原则:尽量在数据切片所在的节点启动MapTask来处理切片数据。这是为了避免网络传输。 比如按照100M切片,Node1上128M,产生28M的网络传输,Node2上128M,加上Node1传递来的28M,Node2需要往Node3传递56M的数据,这样整个集群就产生了28+56=84M的流量! 尽管每个MapTask处理的切片同样大,集群整体的处理时间最短,但是加上IO时间,集群整体的处理性能下降!
(4)切片是针对文件个体!
一个目录总共有500M,分为300M,100M,100M三个文件,那么按照128M切片,产生:128M+128M+44M,100M,100M五个切片; 所以切片是以文件为单位的
2.2 Job提交流程源码
- Job.waitForCompletion(true)提交任务
- Connection() 判断是本地还是yarn集群
- 创建一个…/…/staging路径用于保存一些任务信息
- 生成jobid,并创建文件
- 切片
- 将job.split、job.xml、job.jar包提交到4步生成的文件中
切片的时刻已经确定了,但是具体怎么切是要看实际开发中用的是哪一个InputFormat的实现类,接下来就来看看一些实现类如何切片的
2.3 FileInputFormat切片源码
FileInputFormat是InputFormat接口的子类,专门用于输入类型是文件的数据源
2.4 FileInputFormat切片机制
(1)如何切
(2)切片大小设置
2.5 TextInputFormat
(1)切片机制
切片方式和FileInputFormat是一样的
(2)产生的kv
TextInputFormat输出的key-value对是(行偏移量,行内容)
2.6 KeyValueTextInputFormat
(1)切片机制
切片方式和FileInputFormat是一样的
(2)产生的kv
2.7 NLineInputFormat
(1)切片机制
(2)产生的kv
kv和TextInputFormat是一样的
2.8 CombineTextInputFormat
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
(1)应用场景:
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
(2)虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m 注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值
(3)切片机制
(1)虚拟存储过程: 将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较。 ① 如果不大于设置的最大值,逻辑上划分一个块。 ② 如果输入文件大于setMaxInputSplitSize且大于两倍,那么以setMaxInputSplitSize切割一块; ③ 当剩余数据大小超过setMaxInputSplitSize且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。 例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程: (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。 (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片,超了setMaxInputSplitSize,就是一个切片,没有超过,就继续和下一个虚拟存储文件合并。 (c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为: 1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M) 最终会形成3个切片,大小分别为: (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
(4)CombineTextInputFormat实操
用之前的wordcount程序,准备如下数据目录和数据文件:
|