0. 引入
MapReduce的工作流程大致如下图示:
1. 概述
1.1 分布式并行编程
-
传统并行 传统的程序都是以单指令、单数据流的方式顺序执行 虽然这种方式比较符合人类的思维习惯, 但是,其性能受到单台机器的限制,可扩展性较差 -
分布式并行 分布式并行程序可以运行在由大量计算机构成的集群上 从而可以充分利用集群的并行处理能力 而且可以向集群中增加新的计算节点来进行扩充
1.2 MapReduce模型简介
MapReduce的核心是两个函数:Map和Reduce,其核心思想源自函数式编程语言。
MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢"
因为数据需要用大量的网络传输,而这大规模数据环境下的开销更为庞大
即MapReduce框架会尽量将Map程序就近地在HDFS数据所在的节点运行
也就是将计算节点和存储节点放在一起运行,从而减少了节点间的数据移动开销。
MapReduce框架采用了Master/Slave架构,包括一个Master和若干个Slave
其作业的过程中,包括数据的输入、处理、输出,涉及到的主要组件如下:
-
客户端 提交MapReduce作业 -
JobTracker 运行在Master上,负责作业和任务的调度 监控它们的执行,并重新调度已经失败的任务 -
TaskTracker 运行在Slave上,负责执行由JobTracker指派的任务 -
yarn资源管理器 负责集群上计算资源的协调 -
yarn节点管理器 负责启动和监控集群中机器上的计算容器(container) -
HDFS 分布式文件系统,负责与其他实体共享作业文件
1.3 Map和Reduce函数
两者都是将<key, value> 作为输入和输出,具体的内容由开发者进行构建
函数 | 输入 | 输出 | 说明 | Map | <k1,v1> 如:<行号,”a b c”> | List(<k2,v2>) 如:<“a”,1> | 1、将小数据集进一步解析成一批<key,value>对,输入Map函数中进行处理 2、每一个输入的<k1,v1>会输出一批<k2,v2>。<k2,v2>是计算的中间结果 | Reduce | <k2,List(v2)> 如:<“a”,<1,1,1>> | <k3,v3> 如:<“a”,3> | 输入的中间结果<k2,List(v2)>中的List(v2)表示是一批属于同一个k2的value |
本来是想援引一下教程的示意图,但是发现图没给全,只有Map的部分
我也尝试着去网上找了一下这套原图,没找到,那就不放了,看下文的流程吧
2. MapReduce的工作流程
2.1 工作流程概述
不多哔哔,直接看图吧: 需要注意的有:
-
不同的Map任务之间不会进行直接的点对点通信 -
不同的Reduce任务之间也不会进行直接的信息交换 -
用户不能显式地从一台机器向另一台机器发送消息 -
所有的数据交换都是通过MapReduce框架自身去实现的 -
Map的输入、Reduce的输出都保存在分布式文件系统中 -
而Map任务的中间结果则保存在本地存储(如磁盘)中
2.2 MapReduce的各个执行阶段
-
数据检验 数据输入map之前,要先经过InputFormat模块的校验 比如,验证输入的格式是否符合输入定义 -
数据划分 然后,将输入文件切分为逻辑上的多个InputSplit块 InputSplit是MapReduce对文件进行处理和运算的输入单位 这里只是逻辑上的区域划分,实际上并没有对文件进行切割 InputSplit只是记录了划分的各个小块的数据的位置和长度 -
数据分割 因为InputSplit是对数据的逻辑划分而非对文件的实际切分 所以还要通过RecordReader(RR)处理InputSplit的具体记录 RR会根据逻辑划分的结果,加载不同区域的数据到不同的地方 并将其转换为适合Map任务读取的键值对,输入给相应的Map任务 -
Map阶段 Map任务会根据用户自定义的map函数的映射规则 输出一系列的<key,value>作为中间结果,并存储在本地 -
Shuffle阶段 为了让Reduce可以并行处理Map的结果,需要对Map的输出进行处理 例如分区、排序(Sort)、合并(Combine)和归并(Merge)等操作 得到<key,value-list>形式的中间结果,再交给对应的Reduce程序进行处理 -
Reduce阶段 Reduce以一系列<key,value-list>中间结果作为输入 执行用户定义的函数,输出结果给OutputFormat模块。 -
OutputFormat阶段 OutputFormat模块会验证输出目录是否已经存在 以及输出结果类型是否符合配置文件中的配置类型 如果都满足,就输出Reduce的结果到分布式文件系统
2.3 Shuffle过程详解
2.3.1 Shuffle过程简介
依照上文,Shuffle是处理Map的结果并交给Reduce的阶段
因此,Shuffle过程分为Map端的操作和Reduce端的操作
-
在Map端的Shuffle过程 Map的输出先被写入缓存,缓存满时会启动溢写操作来清空 溢写操作启动后,会先对缓存中的数据进行分区 再对每个分区进行排序(Sort)和合并(Combine)处理 最后再将数据写入磁盘文件,并清空缓存 每次溢写的操作都会生成一个新的磁盘文件 随着Map任务的执行,磁盘中就会生成多个溢写文件 在Map任务全部结束之前,其会被归并(Merge)成一个文件 然后,通知Reduce任务来从文件领取其要处理的相应分区 -
在Reduce端的Shuffle过程 Reduce任务从Map端的不同Map机器领回属于自己需要处理的那部分数据 然后,Shuffle会对数据进行归并(Merge)后交给具体的Reduce函数处理
2.3.2 Map端的Shuffle过程
2.3.2.1 输入数据和执行Map任务
Map任务的数据输入输入一般来自在分布式文件系统的文件块
这些文件块的格式是任意的,可以是文档,也可以是二进制格式的
Map任务接受<key,value>作为输入后,按映射规则进行转换输出
2.3.2.2 写入缓存
每个Map任务都会被分配一个缓存用来写入中间结果
待其积累到一定数量,再一次性批量写入磁盘
这样可以大大减少该作业对磁盘I/O的影响
因为磁盘通过磁头移动和盘片的转动来寻址,开销很大
如果每次Map的输出结果都写入磁盘,那会引入多次寻址开销
而一次性批量写入只需要一次寻址,连续写入,大大降低了开销
需要注意的是,在写入缓存之前,key与value值都会被序列化成字节数组
2.3.2.3 溢写(分区、排序和合并)
-
溢写时机 提供给MapReduce的缓存的容量是有限的,默认大小是100MB 随着Map任务的执行,缓存中Map结果的数量会不断增加,很快就会占满整个缓存 这时,就必须启动溢写(Spill)操作,把缓存中的内容批量写入磁盘并清空缓存 溢写的过程通常是由另外一个单独的后台线程来完成的,不影响往缓存写入Map结果 但为了使得Map结果能够不停地持续写入缓存,就必须保证缓存中一直有可用的空间 所以,一般会设置一个溢写比例来作为启动条件,如0.8,不能等到全满才启动溢写 也就是说,当100MB大小的缓存被填满80MB数据时,就启动溢写过程 把已经写入的80MB数据写入磁盘,剩余20MB空间供Map结果继续写入 -
分区操作 在溢写到磁盘之前,缓存中的数据首先会被分区(Partition) 缓存中的数据是<key,value>形式,要交给不同的Reduce来并行处理 MapReduce通过Partitioner接口对这些键值对进行分区 默认的分区方式是对key进行哈希后再用Reduce的数量取模 可以表示成hash(key) mod R,R表示Reduce任务的数量 这样可以把Map输出结果均匀地分配给这R个Reduce任务去并行处理 MapReduce也允许用户通过重载Partitioner接口来自定义分区方式 -
排序操作 对于每个分区内的所有键值对,后台线程会根据key对它们进行内存排序(Sort) -
合并操作 MapReduce默认的排序操作后,还包含一个可选的合并(Combine)操作 可选意味着MapReduce需要检查用户是否定义了Combiner函数,即是否选择执行 如果函数有定义,那就会执行合并操作,从而减少需要溢写到磁盘的数据量 所谓“合并”,是指将那些具有相同key的<key,value>的value加起来 比如<“xmu”,1>和<“xmu”,1>可以经过合并得到一个键值对<“xmu”,2> 这里需要注意,Map端的这种合并和Reduce的功能十分地近似但又不同 由于这个操作发生在Map端,所以我们称之为“合并”从而有别于Reduce 虽然Combiner是用户的可选操作,但并非所有场合都可以使用Combiner 因为Combiner的输出是Reduce任务的输入,并不是最终的结果 所以Combiner绝不能影响到Reduce任务生成最终的计算结果 也就是Reduce输入的<key,value>与输出<key,value>类型一致 这样就不影响最终结果,例如累加、最大值等场景,但平均数不行
经过分区、排序以及可能发生的合并操作之后
这些缓存中的键值对就可以被写入磁盘并清空缓存
每次溢写操作都会在磁盘中生成一个新的溢写文件
写入溢写文件中的所有键值对,都是经过分区和排序的
2.3.2.4 文件归并
每次溢写操作都会在磁盘中生成一个新的溢写文件spill
随着MapReduce任务的进行,磁盘中spill的数量会越来越多
当然,如果Map输出结果很少,磁盘上只会存在一个溢写文件
总之在Map任务结束之前,所有spill会被归并为一个大的文件
这个大的溢写文件中的所有键值对,也是经过分区和排序的
所谓归并(Merge),是指对于具有相同key的键值对,会被归并成一个新的键值对
例如相同key的<k1,v1>···会被归并成<key, >:<k1, <V1,V2···> >
注意,如果用户定义了Combiner函数,也就是选择了执行Combiner操作
那么MapReduce会检查min.num.spills.for.combine和当前的spill数量
如果磁盘中已经生成的spill的数量超过该参数(默认值是3)
那就再次运行Combiner,从而减少写入磁盘的数据量
但是如果数量小于该参数,MapReduce会认为Combiner“得不偿失”而不执行
注意,这个操作是在溢写的小文件执行Merge之前进行的第二次Combiner
而第一次Combiner则是在数据排序后、溢写到磁盘前去执行的
这个在教程的原文当中并没有写的很清楚
2.3.2.5 总结
经过上述4个步骤以后,Map端的Shuffle过程全部完成
最终生成的一个大文件会被存放在本地磁盘
这个大文件中的数据是已经被分区好了的
不同的分区会被发送到不同的Reduce任务进行并行处理
注意,JobTracker会一直监测Map任务的执行
每当其监测到有一个Map任务完成后
就会立即通知相关的Reduce来“领取”数据
然后开始在Reduce端执行Shuffle过程
2.3.3 Reduce端的Shuffle过程
-
“领取”数据 Map端的Shuffle过程结束后,所有Map输出结果都保存在Map机器的本地磁盘上 Reduce任务需要把这些数据“领取”(Fetch)回来,存放到自己所在机器的本地磁盘上 因此,在每个Reduce任务真正开始之前,它大部分时间都在从Map“领取”自己的数据 每个Reduce任务会不断地通过RPC向JobTracker询问Map任务是否已经完成 JobTracker监测到一个Map任务完成后,就会通知相关的Reduce任务来“领取”数据 一般系统中会存在多个Map机器,因此Reduce会用多线程同时从多个Map领回数据 -
归并数据 从Map端领回的数据,会首先被存放在Reduce任务所在机器的缓存中 由于在Shuffle阶段,Reduce任务还没有真正开始执行 所以这时可以把大部分内存分配给Shuffle过程作为缓存 如果缓存被占满,就会像Map端一样被溢写到磁盘中 从多个Map机器领回的数据中,一般会存在很多可以合并(Combine)的键值对 如果用户定义了Combiner,还可以先执行合并操作,减少写入磁盘的数据量 需要注意的是,这里是类似Map的第一次Combiner,在Merge小文件之前进行 每个溢写过程都会生成一个溢写文件,因此磁盘上会存在多个溢写文件 最终当所有数据都已经被领回时,多个溢写文件会被归并成一个大文件 归并的时候还会对键值对进行排序,从而使得最终大文件中的键值对都是有序的 当然,在数据很少的情形下,缓存就可以存储所有数据,就不需要把数据溢写到磁盘 而是直接在内存中执行归并操作,然后直接输出给Reduce任务 需要说明的是,把磁盘上的多个溢写文件归并成一个大文件,可能需要执行多轮归并 每轮归并可以归并的数量由参数io.sort.factor来控制(默认值是10) 若磁盘有50个溢写文件,每轮可以归并10个,则需要经过5轮,得到5个大文件 -
把数据输入Reduce任务 磁盘中经过多轮归并后得到的若干个大文件,不会继续归并成一个新的大文件 而是直接输入给Reduce任务,这样可以减少磁盘读写开销,整个Shuffle到此结束 Reduce会执行Reduce函数中定义的各种映射,将最终结果保存到分布式文件系统中
2.4 总结
嗯,反正教程的这个位置是这个图,我感觉这意思就是总结
3. WordCount示例
-
检查任务属性 首先,需要检查WordCount程序任务是否可以采用MapReduce来实现 适合用MapReduce框架来处理的数据集,都需要满足一个前提条件 即:待处理的数据集可以分解成许多小的数据集,而且互不关联 这样才能并行地处理每一个小数据集,也就是执行MapReduce流程 在WordCount程序任务中,不同单词之间的频数不存在相关性,彼此独立 可以把不同的单词分发给不同的机器进行并行处理 因此,词频统计任务可以采用MapReduce来实现 -
确定程序思路 其次,确定MapReduce程序的设计思路。 简单来说就是把文件内容解析成许多个单词 然后把所有相同的单词聚集到一起。 最后计算出每个单词出现的次数进行输出 -
捋清运行流程 最后,确定MapReduce程序的执行过程 先把一个大文件切分成许多个分片,每个分片输入给不同机器上的Map任务 各个分片的内容互不关联,各Map并行执行“从文件中解析出所有单词”的任务 Map的输入采用Hadoop默认的<key, value>输入方式 即文件的行号作为key,文件的一行作为value Map的输出以单词作为key,1作为value 即<单词,1>,表示单词出现了1次。 Map阶段完成后,会输出一系列<单词,1>这种形式的中间结果 然后,Shuffle阶段会对这些中间结果进行排序、分区 得到<key, value-list>的形式,如< hadoop, <1,1,1,1,1> > 然后再将<key, value-list>分发给不同的Reduce任务 Reduce任务接收对应的的中间结果(一系列键值对) 再计算得到每个单词的频数并把结果输出到分布式文件系统
3.1 整体流程图
3.2 简易MapReduce流程
3.3 数据分片过程
根据上文,而我们可以知道这里是Inputformat 和RecordReader :
-
Inputformat的作用: 加载、读取HDFS中的文件,对输入进行格式验证; 将大文件切分成许多分片split,但此切分仅是逻辑上的切分 即逻辑定义每个split的起点和长度,并非真正意义的物理切分 -
RecordReader: 记录阅读器 根据Inputformat切分出的split的位置和长度 以<k,v>形式从HDFS中的各个块读取相关分片
3.4 WordCount流程图
原教程这里有MapReduce的一般流程,还是和WordCount的工作流程混着的
结构的安排真的好奇怪啊,混着就算了,我感觉顺序还不对啊
我做了一下拆分,这里只有WordCount,MapReduce见下文
3.4.1 WordCount的split
3.4.2 WordCount的Map
3.4.3 WordCount的Reduce
3.5 详细MapReduce流程
我把上面图片当中MapReduce部分的抽出放在了这里
3.5.1 总图
3.5.2 Map
3.5.3 Shuffle
3.5.4 Reduce
3.6 MapReduce的体系结构
总之…先放个图吧,整体结构
3.6.1 Client(客户端)
-
提交作业: 用户编写的MapReduce程序通过Client提交到JobTracker端。 -
查看作业状态: 用户可通过Client提供的一些接口查看作业运行状态。
3.6.2 JobTracker(作业跟踪器)
-
资源监控 JobTracker监控所有TaskTracker与Job的健康状况 一旦发现节点失效(通信失败或节点故障) 就将相应的任务转移到其他节点 -
作业调度 JobTracker会跟踪任务的执行进度、资源使用量等信息 并将这些信息告诉任务调度器(TaskScheduler) 而任务调度器会选择合适的(比较空闲)节点资源来执行任务
3.6.3 TaskScheduler(任务调度器)
-
工作调度 一般接收JobTracker发送过来的命令 选择合适的(比较空闲)节点资源来执行任务 即将各个TaskTracker上的空闲slot分配给Task使用 -
资源监控 将自身的资源信息及任务的运行进度发送给JobTracker 这一信息的发送方式称为心跳,也就是heart beat
3.6.4 TaskTracker(任务跟踪器)
-
资源监控 TaskTracker会将本节点的资源信息和任务的运行进度汇报给JobTracker 这一汇报方式也是周期性地通过“心跳”机制来传递相关信息的 -
管理任务 接收JobTracker的命令,并执行相应的操作,如启动新任务、杀死任务等 -
管理机制 TaskTracker使用slot等量划分本节点上的资源量,如CPU、内存等 一个Task需要获取到一个slot之后,才有机会运行运行这个任务 slot分为Map slot和Reduce slot两种,分别供Map Task和Reduce Task使用
3.7 总结
我觉得有必要针对这个第三部分WordCount写一个总结,我感觉教程的结构很神奇
就是,总给我一种一会儿举例子,一会儿讲机制的感觉,但是机制不是已经讲过了?
现在这个第三部分,就是讲WordCount实例啊,应该是深入描述这个东西吧
总之感觉挺神奇的,我对顺序做了微调,整体的结构还是按照原教程来的
|