概述
- MapReduce是分布式计算框架
适用于海量数据的离线批处理 不适合实时计算、DAG计算、迭代式计算的场景 是HIVE引擎之一
数据输入
切片与MapTask并行度机制
- 数据块:Block是HDFS物理上把数据分成一块一块
- 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储
- 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
- 每一个Split切片分配一个MapTask实例,并行处理
- 默认,
切
片
大
小
=
B
l
o
c
k
S
i
z
e
切片大小=BlockSize
切片大小=BlockSize
FileInputFormat切片机制
- 切片是针对文件,而不是数据的整体
- 一个文件至少一个MapTask,如果小文件过多,就会有很多MapTask,开销很大
CombineTextInputFormat切片机制
- 将输入目录下所有文件大小,依次跟
setMaxInputSplitSize 设置值比较 如果小于设置值,逻辑上划分一个块; 如果输入文件大于设置值且大于两倍,那么以最大值切割一块, 直到
设
置
值
<
剩
余
数
据
大
小
<
设
置
值
×
2
设置值<剩余数据大小<设置值 \times 2
设置值<剩余数据大小<设置值×2,将文件均分成2个虚拟存储块。
例如setMaxInputSplitSize 值为4M,输入文件大小为9M,则先逻辑上分成一个4M,剩余的大小为5M,如果按照4M逻辑划分,就会出现1M的小的虚拟存储文件,所以将剩余的5M文件切分成2.5M和2.5M两个文件
MapReduce工作流程
Shuffle机制
分区
默认分区是根据key的hashCode对ReduceTasks个数取模得到的
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
- 如果
R
e
d
u
c
e
T
a
s
k
数
量
>
g
e
t
P
a
r
t
i
t
i
o
n
的
结
果
数
ReduceTask数量>getPartition的结果数
ReduceTask数量>getPartition的结果数,就会产生多个空的输出文件
part-r-000xx - 如果
1
<
R
e
d
u
c
e
T
a
s
k
数
量
<
g
e
t
P
a
r
t
i
t
i
o
n
的
结
果
数
1<ReduceTask数量<getPartition的结果数
1<ReduceTask数量<getPartition的结果数,一些分区就会无处安放,会报异常
- 如果
R
e
d
u
c
e
T
a
s
k
的
数
量
=
1
ReduceTask的数量=1
ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件
part-r-00000 - 分区号从零开始,逐一累加
例子:假设自定义分区数为5 job.setNumReduceTasks(1); 正常运行,只产生一个输出文件 job.setNumReduceTasks(2); 报错 job.setNumReduceTasks(6); 正常运行,会产生空文件
排序
-
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序 -
MapTask 会将处理的结果暂时放到环形缓冲区,当环形缓冲区使用率达到阈值,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘; 而当数据处理完毕后,它会对磁盘上所有文件进行归并排序 -
ReduceTask 从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,就溢写磁盘,否则存储在内存中。 如果磁盘上文件数目达到阈值,就进行一次归并排序以生成一个更大文件; 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序
MapTask工作机制
- 1、Read阶段:MapTask通过用户编写的
RecordReader ,从输入InputSplit 中解析出一个个key/value。 - 2、Map阶段:该节点主要是将解析出的key/value交给用户编写
map() 函数处理,并产生一系列新的key/value。 - 3、Collect收集阶段:在用户编写
map() 函数中,当数据处理完成后,一般会调用OutputCollector.collect() 输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。 - 4、Spill溢写阶段:当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
- 4.1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
- 4.2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件
output/spillN.out (N 表示当前溢写次数)中。如果用户设置了Combiner ,则写入文件之前,对每个分区中的数据进行一次聚集操作。 - 4.3:将分区数据的元信息写到内存索引数据结构
SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index 中。 - 5、Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
- 6、当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件
output/file.out 中,同时生成相应的索引文件output/file.out.index 。 - 7、在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并
io.sort.factor (默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 - 8、让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。
ReduceTask工作机制
- Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
- Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
- Sort阶段:按照MapReduce语义,用户编写
reduce() 函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。 - Reduce阶段:
reduce() 函数将计算结果写到HDFS上。
设置ReduceTask个数
job.setNumReduceTasks(4);
SET hive.exec.reducers.bytes.per.reducer;
SET hive.exec.reducers.max;
SET mapred.reduce.tasks;
- ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致
- ReduceTask默认值就是1,所以输出文件个数为一个
- 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
- ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,例如要计算全局汇总结果,就只能有1个ReduceTask
- 具体多少个ReduceTask,需要根据集群性能而定
- 如果分区数不是1,ReduceTask为1,就不执行分区过程。因为执行分区的前提是先判断ReduceNum个数是否大于1,不大于1肯定不执行
JOIN
Reduce Join
Map Join
- Map Join适用于一张小表JOIN一张大表的场景
- 在HIVE中,MapJoin把小表全部加载到内存在map端进行join,避免reducer处理
SET hive.auto.convert.join=true;
SET hive.mapjoin.smalltable.filesize;
HIVE的MapJoin原理图
计数器
- Hadoop为每个作业维护若干内置计数器,以描述多项指标
例如 某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量和已产生的输出数据量
数据输出
OutputFormat 是MapReduce输出的基类
|