说在前面
? 今天的视频在讲解源码,不出意外的我又在迷糊中听了大半,老师在后面摇了好几下我才醒,然后就被安排了一个伟大而艰巨的任务——学会自己看源码……
? 怎么办呢?那就看嘛!在看之前我还不忘百度搜索一下看源码的好处。下方为知乎某大佬原话
“我为什么读源码”
很多人一定和我一样的感受:源码在工作中有用吗?用处大吗?很长一段时间内我也有这样的疑问,认为哪些有事没事扯源码的人就是在装,只是为了提高他们的逼格而已。
那为什么我还要读源码呢?一刚开始为了面试,后来为了解决工作中的问题,再后来就是个人喜好了。说的好听点是有匠人精神;说的委婉点是好奇(底层是怎么实现的);说的不自信点是对黑盒的东西我用的没底,怕用错;说的简单直白点是提升自我价值,为了更高的薪资待遇(这里对真正的技术迷说声抱歉)。
源码中我们可以学到很多东西,学习别人高效的代码书写、学习别人对设计模式的熟练使用、学习别人对整个架构的布局,等等。如果你还能找出其中的不足,那么恭喜你,你要飞升了!会使用固然重要,但知道为什么这么使用同样重要。从模仿中学习,从模仿中创新。
读源码不像围城(外面的人想进来,里面的人想出去),它是外面的人不想进来,里面的人不想出去;当我们跨进城内,你会发现(还是城外好,皮!)城内风光无限,源码的海洋任我们遨游!
走!看源码!看源码!
人话模式
1.探索前的热身
一个结论:
FileInputFormat 是InputFormat的子实现类,实现切片逻辑 TextInputFormat是FileInputFormat的子实现类,实现读取数据逻辑
先探索一下TextInputFormat怎么就实现了读取数据的逻辑吧!
故事得从提交作业开始说起……
脑海里对提交作业后有个大概的思路:提交job–>然后……->mapreduce。中间省略若干步骤(为什么不写大家懂得都懂)
在提交job的方法文档注释上我发现了以下珍贵文案,于是三年没学英语的我站出来了!
/**
* Internal method for submitting jobs to the system.
译:系统提交文件的内部方法。
*
* <p>The job submission process involves:
译:作业提交过程涉及到:
* <ol>
* <li>
* Checking the input and output specifications of the job.
译:检查作业的输入和输出的规格。
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
译:计算作业的输入切片。
* </li>
* <li>
* Setup the requisite accounting information for the
* {@link DistributedCache} of the job, if necessary.
译:如果必要的话,为作业的分布式缓存设置必要的计算信息。
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system
* directory on the distributed file-system.
译:拷贝作业的jar包和配置到hdfs的MapReduce系统的目录
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally
* monitoring it's status.
译:提交作业到jobTracker(大概是作业追踪器)然后可选地追踪它的状态。
所以!大概思路就是上面的译文了!
总结一下:
-
作业提交过程涉及到:
1. **检查作业的输入和输出的规格。**
2. **计算作业的输入切片。**
3. **如果必要的话,为作业的分布式缓存设置必要的计算信息。**
4. **拷贝作业的jar包和配置到hdfs的MapReduce系统的目录**
5. **提交作业到jobTracker(大概是作业追踪器)然后可选地追踪它的状态。**
让我们把今天的目光聚焦到切片机制。我们往下找啊找….终于找到了写切片的地方啦,进入writeSplits方法。
方法体如下:
writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
}
你还别说,确实…看不大懂。不过我们看第6、7行还是没问题的,这个逻辑判断就是看你使用的Mapper是旧版的还是新版的,好,我们走进writeNewSplits……
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
}
代码依旧那么复杂呀,不过我还记得第一步是检查输入文件的规格,因此我们要看看它是怎么检查的。于是走进第六行job.getInputFormatClass方法。
显然这是个接口方法,我们要做的是找到它具体的实现逻辑,因此点击左边框内的小绿点找它的具体实现类,如下:
我们知道现在提交的job与map和reduce无关,而最有可能的是最JobContext的实现方法,因此点进去
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
注意力只要放在第十行代码那里,INPUT_FORMAT_CLASS_ATTR的值为TextInputFormat.class,所以,当我们不指定输入文件规格的时候呢,默认的规格是文本输入规格,这也就是为什么我们在对文件做Mapreduce的时候需要指定输入和输出文件规格了!
这也佐证了为什么TextInputFormat负责读取数据的逻辑了!
看似没用, 其实锻炼了我们探索的能力!
2.探索ing
探索1的结论还记得吗?
FileInputFormat 是InputFormat的子实现类,实现切片逻辑。 TextInputFormat是FileInputFormat的子实现类,实现读取数据逻辑。
怎么样通过抽象方法找到它的实现呢?这就用到啦!
教你们个小技巧,首先找到InputFormat这个抽象类,我们通过(ALT+7)类结构惊奇的发现:
它定义了两个抽象方法恰恰是切片(getSplits)和读取数据(createRecordReader)。我们目的专一。
点击左框绿点,找到它的实现逻辑—-FileInputFormat类
准备好了吗?准备好了!
各单位注意!!!源码来啦!!!
现在是2021年11月12日21:37:56,会是什么时候搞完睡觉呢?
源码
各单位注意,中文注释是小鹏自己理解来写的,其他都是源码作者写的。
public List<InputSplit> getSplits(JobContext job) throws IOException {
StopWatch sw = new StopWatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
boolean ignoreDirs = !getInputDirRecursive(job)
&& job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
for (FileStatus file: files) {
if (ignoreDirs && file.isDirectory()) {
continue;
}
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
}
else {
if (LOG.isDebugEnabled()) {
if (length > Math.min(file.getBlockSize(), minSize)) {
LOG.debug("File is not splittable so no parallelization "
+ "is possible: " + file.getPath());
}
}
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
}
else {
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits;
}
说在后面
现在的时间是2021年11月12日23:30:42。
第一次认真的探索源码,我有点喜欢上了这种探索的感觉,一定会有下一次,下下次以及很多次的。
|