一、切片与MapTask并行度决定机制
1. 数据块
Blocak是HDFS物理上把数据分为一块一块的,数据块是HDFS存储数据的单位
2. 数据切片
数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储,数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask
3. 说明
<1>一个Job的Map阶段并行度由客户端在提交Job时的切片数决定 <2>每一个Split切片分配一个MapTask并行实例处理 <3>默认情况下,切片大小=BlockSize <4>切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
二、Job提交流程源码和切片源码
1. Job提交流程源码详解
waitForCompletion()
submit();
connect();
new Cluster(getConfiguration());
initialize(jobTrackAddr, conf);
submitter.submitJobInternal(Job.this, cluster)
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
JobID jobId = submitClient.getNewJobID();
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);
writeConf(conf, submitJobFile);
conf.writeXml(out);
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

2. 切片源码详解
<1>先找到数据存储目录

<2>遍历目录下的每一个文件

<3>切片
- 获取文件大小fs.size(ss.txt)
 - 计算切片
默认情况下,切片大小等于BlockSize    - 开始切片,每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分为一个块切片。
 - 将切片信息写道一个切片规划文件中
 - 整个切片的核心过程在getSplit()方法中完成
- InputSplit只记录了切片的元数据信息,比如起始位置,长度以及所在节点列表等
<4>提交切片规划文件到YARN
YARN上的MrAppMaster就根据切片规划文件计算开启MapTask个数
三、FileInputFormat切片大小参数配置
1. 源码切片大小计算
        
2. 切片大小设置

3. 获取切片信息API

四、TextInputFormat
- 是FileInputFormat的默认实现类
- 按照行读取每条记录
- key是整个文件的起始字节偏移量,LongWritable类型,value是这行的内容,不包括任何终止符(换行符,回车符),Text类型

五、CombineTextInputFormat
1. 继承关系

2. 应用场景
默认TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量MapTask,效率变低 因此,CombineTextFormat适合用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理
3. 虚拟存储切片最大值设置

4. 虚拟存储过程

5. 切片机制
<1>判单虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片 <2>如果不大于则更下一个虚拟存储文件进行合并,共同形成一个切片。 
六、 CombineTextInputFormat案例
1. 准备四个文件,并修改输入路径
采用之前的wordCount案例,可查看之前的博客 
2. 在不修改的情况下,运行结果
采用TextInputFormat分片方式,分为了四个切片,调用了四个MapTask 
3. 在WordCountDriver中添加以下代码
添加代码
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

运行结果
分为了三个切片,调用了三个MapTask 
4. 修改虚拟存储切片的最大值
修改代码

运行结果
分为了一个切片,调用了一个MapTask 
|