-
input split :输入切片信息。我们知道,MapTask的数量是根据文件的切片,而这个输入的切片信息从哪里来呢?这是根据客户端在submit() 前,获取待处理数据的信息,然后根据参数配置,对文件形成一个任务分配的划分,它会将一个文件以默认的切片方式进行切片,生成一个job.split文件给map方法。
-
map:map方法。在Mapper中,调用context.write(key,value)实际是调用代理NewOutPutCollector的wirte方法,然后调用了MapOutPutBuffer的collect(),且在进行收集前,会调用partitioner来计算每个key-value的分区号。
-
buffer in memory: 内存中的缓冲区。也就是俗称的环形缓冲区,默认大小是100M(io.sort.mb)的字节数组(kvbuffer = new byte[100 << 20]),数据的默认开始位置是0(bufstart = bufend = bufindex = equator = 0),当kvindex == kvend == kvstart时,缓冲区为空。
数据和元数据以相反的方向进行输入,当缓冲区内空间不足20%(io.sort.spill.percent 默认为0.8,80%)时,他就开始准备往外溢写数据
-
partition sort and spill to disk:分区排序并溢出到磁盘。而在写文件之前缓冲区内会进行一个分区和快速排序(这个快排只是对元数据进行了排序,真实数据未变)。排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
然后按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。每次溢写都会产生一个临时文件。
如果定义了Combiner,使用CombinerRunner.conbine()对key-value处理(combiner)后再次溢写!
最后将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
-
merge on disk:在磁盘上合并。当 Shuffle 机制结束后,会在磁盘上看到很多溢写的临时文件。而当所有数据处理(这里所有数据指当前一个MapTask需要处理的数据)结束后,MapTask会以分区为单位进行合并,对于某个分区,它将采用多轮递归合并的方式。每轮合并10(io.sort.factor默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。这之后,如果用户设置了Combiner(前提是溢写的文件个数大于3 min.num.splits.for.combine默认为3),则写入文件之前,对每个分区中的数据再进行一次区内聚集操作。并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
最终生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待ReduceTask来拷贝相应分区的数据
若定义了压缩,则在生成最终文件之前数据将会被压缩。