MR工作流程
JOB提交
客户端提交Job.waitForCompletion Job.submit setUseNewAPI()适配API connect()建立连接为YARN?? 此时状态为JobState.DEFINE submitter.submitJobInternal 这里面有几个流程 ###判断输出路径是否存在 checkSpecs(job) 每次忘了删除输出目录报错
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
创建staging路径
JobSubmissionFiles.getStagingDir(cluster, conf)->Cluster.getStagingAreaDir->YARNRunner.getStagingAreaDir->ResourceMgrDelegate.getStagingAreaDir->MRApps.getStagingAreaDir
private static final String STAGING_CONSTANT = ".staging";
public static Path getStagingAreaDir(Configuration conf, String user) {
return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
+ Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
}
conf.get(“yarn.app.mapreduce.am.staging-dir”,"/tmp/hadoop-yarn/staging")+"/"+.staging 默认路径/tmp/hadoop-yarn/staging/.staging
获取jobId
JobID jobId = submitClient.getNewJobID() TODO 比较复杂 job相关上传到submitJobDir目录 submitJobDir = new Path(jobStagingArea, jobId.toString()) /tmp/hadoop-yarn/staging/.staging/${jobid}
上传jar包到集群
copyAndConfigureFiles()->JobResourceUploader.uploadResources(job, jobSubmitDir)->JobResourceUploader.uploadResourcesInternal mkdirs(jtFs, submitJobDir, mapredSysPerms)创建submitJobDir目录 上传逻辑
uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache);
uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache);
uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
archiveSCUploadPolicies, statCache);
uploadJobJar(job, jobJar, submitJobDir, replication, statCache);
addLog4jToDistributedCache(job, submitJobDir);
计算切片,生成切片规划文件
writeSplits(job, submitJobDir) 使用writeNewSplits(job, jobSubmitDir) List splits = input.getSplits(job) FileInputFormat.getSplits
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)) getFormatMinSplitSize()为1 getMinSplitSize(job)为mapreduce.input.fileinputformat.split.minsize,缺省1 long maxSize = getMaxSplitSize(job) mapreduce.input.fileinputformat.split.maxsize,缺省MAX_VALUE 循环遍历文件,每个文件单独切片 isSplitable是否支持切割,snappy是不能进行切片的。 获取块大小long blockSize = file.getBlockSize(); computeSplitSize获取切片大小,块大小等于切片大小 Math.max(minSize, Math.min(maxSize, blockSize)) maxsize为最大值,blocksize为128M,取blocksize。和minisize1比,blocksize又是大的。
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
JobSplitWriter.createSplitFiles形成切片文件 maxsize设置的比blocksize小,会让切片变小, minsize调的比blocksize大,会让切片变大
向Stag路径写job.XML
writeConf(conf, submitJobFile) 写逻辑在conf.writeXml(out)->TransformerImpl.transform ->transform(source, toHandler, _encoding)->transferOutputProperties->DOM2TO.parse 使用java的dom处理xml
提交YARN
配置上下文
上面事情做完后,调用submitClient.submitJob ApplicationSubmissionContext appContext =createApplicationSubmissionContext(conf, jobSubmitDir, ts) 在setupAMCommand设置一堆参数 其中有 vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); “org.apache.hadoop.mapreduce.v2.app.MRAppMaster” 把上下文传给RM resMgrDelegate.submitApplication(appContext)
状态转换
这一块太复杂,就经过一些了状态变化,RM、AM等注册,调用,然后启动了MRAPPMaster RMAppEventType.START->RMStateStore的STORE_APP->MAppEventType.APP_NEW_SAVED->FifoScheduler的APP_ADDED->RMAppEventType.APP_ACCEPTED->RMAppAttemptEventType.START->FifoScheduler的APP_ATTEMPT_ADDED->RMAppAttemptEventType.ATTEMPT_ADDED->RMNodeEventType.STATUS_UPDATE->FifoScheduler的NODE_UPDATE-> RMAppAttemptEventType.CONTAINER_ALLOCATED->RMAppAttemptState.ALLOCATED_SAVING->RMStateStoreEventType.STORE_APP_ATTEMP ->RMAppAttemptEventType.ATTEMPT_NEW_SAVED ->RMAppAttemptEventType.LAUNCHED->ContainerEventType.INIT_CONTAINER->ContainersLauncherEventType.LAUNCH_CONTAINER
最后 containerLauncher.submit(launch); ContainerLaunch 的call exec.activateContainer(containerID, pidFilePath); 就是调用org.apache.hadoop.mapreduce.v2.app.MRAppMaster,main
MRAppMaster
是MapReduce的ApplicationMaster实现,它使得MapReduce计算框架可以运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括创建MapReduce作业,向ResourceManager申请资源,与NodeManage通信要求其启动Container,监控作业的运行状态,当任务失败时重新启动任务等。 进入main方法 继续调用initAndStartAppMaster->appMaster.start()->serviceStart() 这个serviceStart()是实现appMaster的serviceStart方法
- 调用createJob()方法创建作业Job实例job
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
//当作业被创建后,它就被定义了作业完成事件JobFinishEvent的处理器为JobFinishEventHandler dispatcher.register(JobFinishEvent.Type.class, createJobFinishEventHandler()); return newJob; } ``` 2. 创建一个Job初始化事件initJobEvent 3. handle()方法,处理Job初始化事件initJobEvent 4. 启动客户端服务clientService clientService.start() 5. 调用父类的serviceStart(),启动所有组件 super.serviceStart() 6. 调用startJobs()方法启动作业 startJobs(); 这里也是经过了一些了状态变化,最后containerLauncher.submit(launch),调用yarnchaild的mian方法
YARNChaild
final Task taskFinal = task; taskFinal.run(job, umbilical) Task实现是MapTask和ReduceTask,run方法就进相应的逻辑
MAPTask
进入maptask的run方法 先进度条设置 然后判断是否cleanup 进入runNewMapper 1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。 客户端上传任务到资源层,其中包括Jar包,配置文件,切片三个文件,container拿到可以实例化job
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
2.反射得到用户定义的Mapper实现类,也就是map函数的类
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
(org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
3.得到InputFormat实现类
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
默认TextInputFormat
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
4.得到当前task对应的InputSplit split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); splitIndex为JobImpl.transition的createSplits方法得到所有分片。通过切片文件位置,偏移得到该map获取的数据 5.通过InputFormat,得到对应的RecordReader,可以读取分片的一条数据。 new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext) private final org.apache.hadoop.mapreduce.RecordReader<K,V> real; this.real = inputFormat.createRecordReader(split, taskContext); 为LineRecordReader
6.生成RecordWriter实例 如果没有reduce进入NewDirectOutputCollector 有进入NewOutputCollector ①.初始化了环形缓冲区 collector = createSortingCollector(job, reporter); collector.init(context); ②.分区partitions
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
7.初始化input input.initialize(split, mapperContext); 就是LineRecordReader的initialize方法 8.执行run mapper.run(mapperContext) 里面分别是setup,map,cleanup,调用用户自定义方法 context.write写入环形缓冲区 9.关闭输出流 input.close(); 溢写
MapOutputBuffer 环形缓冲区
####参数 kvbuffer:环形缓冲区 kvmeta:meta信息缓冲区 kvindex:下次要插入的meta信息的起始位置 kvstart:溢写时meta数据的起始位置 kvend:溢写时meta数据的结束位置 bufindex:raw数据的结束位置 bufstart:溢写时raw数据的起始位置 bufend:溢写时raw数据的结束位置 equator:缓冲区的中界点 spillper:当数据占用超过这个比例,会造成溢写,由配置“mapreduce.map.sort.spill.percent”指定,默认值是0.8 sortmb:kvbuffer占用的内存总量,单位是M,由配置“mapreduce.task.index.cache.limit.bytes”指定,默认值是100 indexCacheMemoryLimit:存放溢写文件信息的缓存大小,由参数“mapreduce.task.index.cache.limit.bytes”指定,单位是byte,默认值是1024*1024(1M) bufferRemaining:buffer剩余空间,字节为单位 softLimit:字节单位的溢写阈值,超过之后需要写磁盘
####初始化init sorters缓冲区内排序设置为快速排序 kvbuffer设置为sortmb的大小,默认100M equator为0 kvindex=int) (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4 aligned = = pos - (pos % METASIZE)
bufstart = bufend = bufindex = equator bufferRemaining为空=softLimit softLimit = (int) (kvbuffer.length * spillper);默认80M
写入缓冲区collect方法 未溢出时,没有溢写阻塞, bufferRemaining-16,kvmeta空间 keystart为现在bufindex值为0 key序列化 System.arraycopy(b, off, kvbuffer, bufindex, gaplen) 更新bufindex为这次写入key的位置,1(key长度值)+key的长度 kvbuffer值,key的长度+每个key的值 bufferRemaining 减去相应大小
valstart为现在bufindex 序列化value 更新bufindex为这次写入value的位置,1(value长度值)+value的长度 kvbuffer值,key的长度+每个key的值+value的长度+每个value的值 bufferRemaining 减去相应大小
kemeta赋值 kvmeta.put(kvindex + PARTITION, partition);分区,默认hash值分区 kvmeta.put(kvindex + KEYSTART, keystart);key起始位置 kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); kvindex位置更新 kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();(一直减少) 也就是往后移动16字节,4个int
当有溢写后,需要调整环形缓冲区参数
final int avgRec = (int)
(mapOutputByteCounter.getCounter() /
mapOutputRecordCounter.getCounter());
final int distkvi = distanceTo(bufindex, kvbidx);
final int newPos = (bufindex +
Math.max(2 * METASIZE - 1,
Math.min(distkvi / 2,
distkvi / (METASIZE + avgRec) * METASIZE)))
% kvbuffer.length;
setEquator(newPos);
bufmark = bufindex = newPos;
final int serBound = 4 * kvend;
bufferRemaining = Math.min(
distanceTo(bufend, newPos),
Math.min(
distanceTo(newPos, serBound),
softLimit)) - 2 * METASIZE;
####溢写 一直这么写,直到bufferRemaining<=0,或者数据少没有溢写,就是input.close时也就是flush 主要是sortAndSpill方法 按分区,进行文件生成 区内排序,快排 调用combiner方法 写文件
flush最后进行归并文件 mergeParts()
ReducTask
进入run方法 创建shuffleConsumerPlugin对象,也就是Shuffle类 三个过程copyPhase、sortPhase、reducePhase
copyPhase
shuffleConsumerPlugin.init(shuffleContext); RawKeyValueIterator rIter = shuffleConsumerPlugin.run() 启动线程·监控map完成情况 EventFetcher线程,用来获取map完成的事件 LocalFetcher或者Fetcher获取map端输出数据线程
Fetcher线程
1、等待merge完成 merger.waitForResource() 2、获取host host = scheduler.getHost() 3、拉取数据 copyFromHost(host) getMapOutputURL openShuffleUrl connect copyMapOutput这里会根据数据量创建MapOutput实例,一般创建的是InMemoryMapOutput,即将数据存到内存了(当数据量很少的时候会直接拷贝到磁盘,即创建OnDiskMapOutput) mapOutput.shuffle里面有个doShuffle方法分别是InMemoryMapOutput和OnDiskMapOutput实现 InMemoryMapOutput中复制数据IOUtils.readFully(input, memory, 0, memory.length) MergeThread.startMerge将数据merge到磁盘。 mapreduce.job.reduce.slowstart.completedmaps:在maptask完成了一定百分比后将触发fetch,默认为0.05
mapreduce.reduce.shuffle.read.timeout:fetch读数据的timeout时间,默认为3分钟
mapreduce.reduce.shuffle.maxfetchfailures:fetch最大的失败次数,默认为10次
mapreduce.reduce.shuffle.connect.timeout:fetch建立连接的timeout时间,默认也是3分钟
mapreduce.reduce.shuffle.parallelcopies:同时创建的fetch线程个数
这些线程结束,copy阶段完成
sortPhase
MergeManager Implmerger.close()得到RawKeyValueIterator
reducePhase
RawComparator comparator = job.getOutputValueGroupingComparator(); 分组比较器 runNewReducer 获取kv数据 final RawKeyValueIterator rawIter = rIter; rIter = new RawKeyValueIterator() createReduceContext创建上下文 reducer.run执行reduce
写文件
reduceContext.write->LineRecordWriter.write
|