简介:本文对MapReduce物理、逻辑工作流以及shuffle过程做简要整理
MapReduce采用的是分而治之的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个从节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单来说,MapReduce就是”任务的分解与结果的汇总。
物理实体工作流
MapReduce体系结构主要由四个部分组成, 分别是:Client、JobTracker、TaskTracker以及Task
组成结构
Client
JobTracker
-
JobTracker位于NameNode上 -
JobTracker负责资源监控和作业调度,并与TaskTracker通信 -
JobTracker监听TaskTracker的健康状况heartbeat,一旦失败就将任务转移至其他节点 -
JobTracker通过任务调度器TaskScheduler选择任务的执行次序,以便为基于队列的FIFO调度器
TaskTracker
TaskTracker位于DataNode中,是JobTracker和Task的桥梁
- 汇报心跳:TaskTracker周期性将所有节点上各种信息通过心跳机制汇报给JobTracker。这些信息包括两部分:
- 机器级别信息:节点健康情况、资源使用情况等。
- 任务级别信息:任务执行进度、任务运行状态等。
- 执行命令:JobTracker会给TaskTracker下达各种命令,主要包括:启动任(LaunchTaskAction)、提交任务(CommitTaskAction)、杀死任务(KillTaskAction)、杀死作业(KillJobAction)和重新初始化(TaskTrackerReinitAction)。
- TaskTracker 使用“slot”等量划分本节点上的资源量(CPU、内存等)。一个Task 获 取到一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的 空闲slot分配给Task使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和 Reduce Task 使用
Task
Task分为Map Task 和Reduce Task 两种, 均由TaskTracker 启动 Map:对Map Task读到的一行数据进行处理 Reduce:对Reduce Task独到的同key数据进行处理
工作流程
- 提交作业。用户将所有应该配置的参数根据需求配置好,并通过编写的MapReduce程序通过客户端(Client)向Hadoop集群的JobTracker发送作业提交请求。作业提交之后,就会进入自动化执行,用户只能监控程序的执行情况和强制中断作业,不能对作业的执行过程进行任何干预。
- 初始化作业。JobTracker 在 JobTracker 端开始初始化工作,包括在其内存里建立一系列数据结构,来记录这个 Job 的运行情况。
- 分配任务。JobTracker 会向 HDFS 的 NameNode 询问有关数据在哪些文件里面,这些文件分别散落在哪些结点里面。JobTracker 需要按照“就近运行”原则分配任务。
- 执行任务。TaskTracker 分配到一个任务后,通过 HDFS 把作业的 Jar 文件复制到 TaskTracker 所在的文件系统,同时,TaskTracker 将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。TaskTracker 为任务新建一个本地工作目录,并把 Jar 文件中的内容解压到这个文件夹中。TaskTracker 启动一个新的 JVM 来运行每个任务(包括 Map 任务和 Reduce 任务),这样,JobClient 的 MapReduce 就不会影响 TaskTracker 守护进程。任务的子进程每隔几秒便告知父进程它的进度,直到任务完成。
- 进程和状态的更新。一个作业和它的每个任务都有一个状态信息,包括作业或任务的运行状态,Map 任务和 Reduce 任务的进度,计数器值,状态消息或描述。任务在运行时,对其进度保持追踪。
- 作业的完成。当 JobTracker 接收到的这次作业的最后一个任务已经完成时,它会将 Job 的状态改为“successful”。当 JobClient 获取到作业的状态时,就知道该作业已经成功完成,然后 JobClient 打印信息告知用户作业已成功结束。
逻辑实体工作流
MapReduce工作流程
-
InputFormat预处理。验证输入格式是否要求;对输入文件使用InputSplit进行逻辑切片,大多数情况下,理想的分片大小是一个HDFS块(block),每个逻辑切片将由一个Map Task进行处理;通过RecordReader(RR)将切片数据转换为适合Map任务处理的键值对,输入给Map任务 -
执行Map操作。根据用户定义的Map()函数,处理数据,输出<key,value> -
Map端的Shuffle。对Map输出结果进行分区(Partition)、排序(Sort)、合并(Combine)、归并(Merge)等操作,得到<key,value-list>的中间结果 -
Reduce端的Shuffle。Reduce以<key,value-list>为输入,执行用户自定义的逻辑,输出<key,value>到OutputFormat模块 -
OutputFormat模块会验证输出目录是否已经存在、输出结果类型是否符合配置文件中的配置类型,若满足则输出值HDFS中
Shuffle
Shuffle是对Map输出结果进行分区Partition、排序Sort、合并Combiner等处理并交给Reduce的过程。Shuffle分为Map端和Reduce端两部分操作
Map端的Shuffle过程
-
写入缓存。每个Map任务会被分配一个缓存,在执行Map的过程中会将执行结果写入缓存,为提高效率不直接写入磁盘 -
溢写。当缓存已用容量达到一定比例后,就会启动溢写(Spill)操作,将缓存中的内容写入磁盘,清空缓存。 在写入磁盘前,缓存中的数据会被分区(Partition),以便均匀分配给Reduce任务出并行处理;对每个分区内的键值对,会根据key进行排序(Sort),并选择是否对数据进行合并(Combine)操作,以减少溢写到磁盘的数据量。每次溢写操作都会在磁盘中生成一个新的溢写文件 -
随着溢写文件的增多,系统会对所有溢写文件进行归并(Merge)操作以生成一个大的溢写文件。如:将<k1,v1>,<k1,v2>,<k1,v3>归并成<k1,<v1,v2,v3>>。Map端的Shuffle最终会生成一个大文件存储在本地磁盘上,这个大文件中的数据是被分区的,不同分区会被发送到不同的Reduce任务进行并行处理。
Reduce端的Shuffle过程
从Map端读取Map结果,进行归并操作,输出给Reduce任务
-
领取(Fetch)数据。Reduce任务会不断通过RPC轮询JobTracker关于Map任务是否执行完成,若有Map任务执行完毕,那么就会有相关Reduce任务来Fetch数据到本地磁盘中 -
归并(Merge)数据。从Map端取回的数据会放入Reduce任务所在节点的缓存中,当缓存区满了,那么会溢写到磁盘中形成多个磁盘文件,若磁盘文件超出一定数量则会被归并成一个大文件 -
将归并的大文件输入给Reduce任务。接下来,Reduce任务会执行用户自定义的Reduce函数,输出结果并保存到DFS中
注意:
- 不同的Map任务之间不会进行通信
- 不同的Reduce任务之间也不会发生任何信息交换
- 用户不能显式地从一台机器向另一台机器发送消息
- 所有的数据交换都是通过MapReduce框架自身去实现的
其他细节知识
split与block
split(切片)是MapReduce里的概念;而block(块)是hdfs中物理切块的大小 HDFS以固定大小的block为基本单位存储数据,而对于MapReduce而言,其处理单位是split。split只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。
- hadoop在默认的情况下,split和hdfs的block的大小是一样的,但也可以不同。可以通过配置文件进行设置:
- minsize默认大小为1,可以通过属性mapreduce.input.fileinputformat.split.minsize进行设置。
- maxsize默认大小为Long.MAXValue,可以通过属性mapreduce.input.fileinputformat.split.maxsize进行设置。
- 在mapreduce的FileInputFormat类中有个getSplits()方法对文件进行spli
参考: 2022最新黑马程序员大数据Hadoop入门视频教程_Map_Reduce_bilibili InputFormat&InputSplit&RecordReader解析-CSDN博客
|