MapReduce框架原理
InputFormat数据输入
切片与MapTask并行度决定机制
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
问题:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因素影响了 MapTask 并行度?(一个MapTask也就是一台服务器)
数据块:Block 是 HDFS 物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行 存储。
并行度决定机制如下,有多少个切片就有多少个MapReduce:

Job提交流程和切片流程
1、Job提交流程源码如下
waitForCompletion()
submit();
connect();
new Cluster(getConfiguration());
initialize(jobTrackAddr, conf);
submitter.submitJobInternal(Job.this, cluster)
checkSpecs(job);
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
JobID jobId = submitClient.getNewJobID();
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
writeConf(conf, submitJobFile);
conf.writeXml(out);
status = ubmitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

本地运行和Yarn上运行不同的区别是,本地运行不需要提交jar包,而Yarn上运行是需要的
FileInputFormat切片流程如下:

默认情况下切片最小值(minSize)是1,默认最大(maxSize)是Long的最大值;
如果是放在本地运行,那么块大小是32M,在Yarn上运行快大小是128M。
切片大小计算公式为
Math.max(minSize, Math.min(maxSize, block.size));
切片时,是一个一个文件遍历去计算和操作的。
比如129M的文件,存的时候存2块,切的时候只切1片。
FileInputFormat切片机制(input.getSplits(job))


CombineTextInputFormat切片机制
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。因此CombineTextInputFormat切片可用于小文件处理。
虚拟存储切片最大值设置如下:
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
切片机制如下:
生成切片过程包括:虚拟存储过程和切片过程二部分。
1、虚拟存储过程:
将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时将文件均分成 2 个虚拟存储块(防止出现太小切片)。
例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。
2、切片过程:
(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片;
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片;’
测试举例:有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)
最终会形成 3 个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
实操:
目的:输入4个小文件合并成一个切片统一处理;

实现过程:
1、不做任何处理运行WordCount案例,观察切片个数为4;

2、在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 3。
驱动类中添加代码如下:
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
运行如果为 3 个切片。

3、在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 1。
驱动中添加代码如下:
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);
运行如果为 1 个切片。

FileInputFormat实现类

TextInputFormat:处理文本
KeyValueInputFormat:处理KV对数据的
NLineInputFormat:按行处理的
CombineTextInputFormat:处理小文件的

就可以通过实例来对比,一下是在Mapper中打印的key的情况,与原有数据的对比
System.out.println(key.toString() + ", " + value.toString());


实操:
目的:统计输入文件中每一行第一个单词相同的行数
输入数据:

输出数据:
caocao 2
caozhi 1
liubei 2
Map阶段
1、读取数据:caocao shi wei wu di
2、设置key和value:<caocao, 1>
3、写出
Reduce阶段
1、读取数据:<caocao, 1>、<caocao, 1>
2、汇总:<caocao, 2>
Driver处理:
1、设置切割符,默认是\t及Tab分割
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
2、设置输入格式,默认是TextInputFormat
job.setInputFormatClass(KeyValueTextInputFormat.class);
Mapper代码如下:
public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {
IntWritable v = new IntWritable(1);
@Override
protected void map (Text key, Text value, Context context) throws IOException, InterruptedException {
System.out.println(key.toString() + ", " + value.toString());
context.write(key, v);
}
}
Reducer代码
public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();
@Override
protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
context.write(key, v);
}
}
Driver代码如下
public class KVTextDriver {
public static void main (String[] args) throws Exception {
args = new String[]{"/home/lxj/hadoop-data/input/KVText", "/home/lxj/hadoop-data/output/KVText"};
Configuration conf = new Configuration();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
Job job = Job.getInstance(conf);
job.setJarByClass(KVTextDriver.class);
job.setMapperClass(KVTextMapper.class);
job.setReducerClass(KVTextReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
Mapper读取的数据如下:

最终的运行结果如下;
 
实操:
对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少切片,本次使用3行作为每个切片的大小
输入数据:
caocao shi wei wu di
liubei shi shu zhao lie di
liubei shi shu guo de
caocao shi wei guo de
caozhi shi caocao de er zi
liushan shi liu bei de er zi
caozhi ye shi caocao de er zi
输出结果为:3片
Map阶段:
1、获取一行数据;
2、切割;
3、循环写出;
Reduce阶段:
1、汇总;
2、写出;
Driver:
1、设置InputFormat类型
2、设置NlineInoutFormat为3行一切片
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 3);
结果如下:

bei 1
caocao 4
caozhi 2
de 5
di 2
er 3
guo 2
lie 1
liu 1
liubei 2
liushan 1
shi 7
shu 2
wei 2
wu 1
ye 1
zhao 1
zi 3

案例实操:
无论 HDFS 还是 MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义 InputFormat 实现小文件的合并。合并的文件,在之后要处理时,启动job处理只需要处理合并之后的文件就可以了
目的:
将多个小文件合并成一个 SequenceFile 文件(SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对的文件格式),SequenceFile 里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为 value。
输入数据:
wei.txt
caocao shi wei wu di
caocao shi wei guo de
caozhi shi caocao de er zi
caozhi ye shi caocao de er zi
shu.txt
liubei shi shu zhao lie di
liubei shi shu guo de
liushan shi liu bei de er zi
wu.txt
sunquan shi wu da di
sunce shi sunquan ta ge
sunjian shi ta men de ba
输出一个SequceFile文件,它是一个二进制顺序文件。由于它的格式紧凑,很容易被压缩,因此如果输出需要作为后续的MapReduce任务的输入,这便是一个很好的输出格式。

InputFormat继承类
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
public RecordReader<Text, BytesWritable> createRecordReader (InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
WholeRecordReader recordReader = new WholeRecordReader();
recordReader.initialize(inputSplit, context);
return recordReader;
}
}
ReadRecord继承类,ReadRecord决定了本次处理怎么读取数据内容
public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
FileSplit split;
Configuration configuration;
Text key = new Text();
BytesWritable value = new BytesWritable();
boolean isProgress = true;
@Override
public void initialize (InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
this.split = (FileSplit) inputSplit;
this.configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue () throws IOException, InterruptedException {
if (isProgress) {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(configuration);
FSDataInputStream fis = fs.open(path);
byte[] buf = new byte[(int) split.getLength()];
IOUtils.readFully(fis, buf, 0, buf.length);
value.set(buf, 0, buf.length);
key.set(path.toString());
IOUtils.closeStream(fs);
isProgress = false;
return true;
}
return false;
}
@Override
public Text getCurrentKey () throws IOException, InterruptedException {
return key;
}
@Override
public BytesWritable getCurrentValue () throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress () throws IOException, InterruptedException {
return 0;
}
@Override
public void close () throws IOException {
}
}
Mapper和Reducer就是对读取的数据进行写出或者循环写出。
Driver中关键之处在于
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
结果如下:

执行过程是先执行InputFormat中的nextKeyValue(),再执行map(),然后循环执行直到 nextKeyValue返回false。 
接下来执行reduce()。
总结一下FileInoutFormat实现类:
TextInputFormat:按照块的大小进行切片;Key为行数据偏移量;Value为行数据;
KeyValueTextInputFormat:按照块的大小进行切片;Key为根据分隔符分割之后的第一列;Value为为根据分隔符分割之后的第一列之后的数据;
NLineInputFormat:按行进行切片;Key为行数据偏移量;Value为行数据;
CombineTextInputFormat:按照设置的最大值进行切片;Key为行数据偏移量;Value为行数据;
上面的自定义InputFormat:按照块的块大小进行切片;Key为文件绝对路径;Value为文件内容;(这个可以根据需求去进行自定义)
MapReduce工作流程

上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第16 步结束,具体 Shuffle 过程详解,如下:
1、MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中
2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3、多个溢出文件会被合并成大的溢出文件
4、在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序
5、ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据
6、ReduceTask 会取到同一个分区的来自不同 MapTask 的结果文件(如果大文件则存在磁盘,如果是小文件则直接在内存中处理),ReduceTask 会将这些文件再进行合并(归并排序)
7、合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group(相同key的分在一组,每次取一组),调用用户自定义的 reduce()方法)
备注:
a、环形缓冲区:默认100M,存在于内存中,当其中保存的数据达到80%的时候,在空闲位置的中间开始写入数据(此时是反向写入数据),并将之前保存在内存中的数据输出到磁盘中;环形缓冲区中将会保存KV数据以及其对应的索引信息。
b、索引信息包括:key开始位置、value开始位置、分区信息、以及索引。
c、先分区,后排序,在分区内部进行排序。排序是用key进行字典顺序排序,排序方法是快速排序
d、启动ReduceTask的数量由分区决定,多少个分区就有多少个ReduceTask。
|