为了解决大量数据的计算问题,分布式计算诞生了。在MapReduce出现前其实就存在了分布式计算的模式,但是MapReduce为分布式计算实现了一套通用化的流程与规范。MapReduce是Hadoop架构下的计算层,它把任务分割成小任务并分发到集群的机器上并行执行。
1.MapReduce的诞生
MapReduce是最先由Google提出的分布式计算软件构架,它可以支持大数据量的分布式处理,是Google大数据领域“三辆马车”之一,MapReduce借助分治的思想,开创性地将分布式计算过程划分为两个阶段:Map阶段和Reduce阶段,它将大数据计算任务拆分为大量的小数据计算任务,然后进行并行的分布式计算,计算获得的结果会被合并从而获得最终的计算结果,这也是MapReduce名称的由来。
MapReduce不仅是一个计算框架,更是一种计算思想。
MapReduce位于Hadoop架构中的计算层,与HDFS配合完成分布式计算工作,是Hadoop体系的核心组成部分:
MapReduce的优势和能做到的:
- MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。 就是因为这个特点使得 MapReduce 编程变得非常流行。 - 良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。 - 高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上面上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。 - 适合 PB 级以上海量数据的离线处理
它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce 很难做到。
MapReduce不适合做的:
这里的不擅长不代表它不能做,而是在有些场景下实现的效果差,并不适合 MapReduce 来处理,主要表现在以下几个方面:
- 实时计算
MapReduce 无法像 Mysql 一样,在毫秒或者秒级内返回结果。 - 流式计算
流式计算的输入数据时动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。 - DAG(有向图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后,每个MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
所以,MapReduec的特性就决定了基于Hadoop体系/MapReduce体系的分布式计算技术特性为离线、批处理计算,而不适合用于实时、流式计算。
2.MapReduce的架构与工作流程
MapReduce的计算思想是分而治之,既然数据很大,那么就把大数据拆成若干小数据进行并行计算,最后对小数据的计算结果做合并:
MapReduce计算过程只需要用户定义 Map 和 Reduce 两个过程,map 的主要输入是一对 <Key, Value> 值,经过 map 计算后输出一对 <Key, Value> 值;然后将相同 Key 合并,形成 <Key, Value 集合 >;再将这个 <Key, Value 集合 > 输入 reduce,经过计算输出零个或多个 <Key, Value> 对。
以一个WordCount程序来看MapReduce是如何运行的,以下是WordCount程序MapReduce的Java实例代码:
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}
map 函数的计算过程是,将这行文本中的单词提取出来,针对每个单词输出一个 <word, 1> 这样的 <Key, Value> 对。MapReduce 计算框架会将这些 <word , 1> 收集起来,将相同的 word 放在一起,形成 <word , <1,1,1,1,1,1,1…>> 这样的 <Key, Value 集合 > 数据,然后将其输入给 reduce 函数。
reduce 函数的计算过程是,将这个集合里的 1 求和,再将单词(word)和这个和(sum)组成一个 <Key, Value>,也就是 <word, sum> 输出。每一个输出就是一个单词和它的词频统计总和。 一个 map 函数可以针对一部分数据进行运算,这样就可以将一个大数据切分成很多块(这也正是 HDFS 所做的),MapReduce 计算框架为每个数据块分配一个 map 函数去计算,从而实现大数据的分布式计算。
比如两个hdfs文件为:
Hello World
Bye World
Hello Hadoop
Bye Hadoop
首先会运行两个Map任务,对其分别进行解析,从而得到以<单词,次数>这样的键值对:<Hello, 1>、<Hello, 1>、<World, 1>、<World, 1>、<Bye, 1>、<Bye, 1>、<Hadoop, 1>、<Hadoop, 1>,然后以Key作为聚合,获得<单词,List<次数>>这样的键值对,也就是获得<Hello, [1, 1]>、<World, [1, 1]>、<Bye, [1, 1]>和<Hadoop, [1, 1]>,最后将其作为输入传递给Reduce任务,Reduce任务以单词作为纬度,对List进行聚合,最后就形成<Hello, 2>、<World, 2>、<Bye, 2>和<Hadoop, 2>的词频统计结果了:
MapReduce的架构核心组成(类、实体或进程)包括JobScheduler、JobTracker、TaskTracker、Mapper、Reducer、InputFormat、InputSplit、RecordReader、Partitioner、Combiner、Shuffle、OutputFormat等:
首先是三个进程JobTracker(JobScheduler)、TaskTracker及Map/Reduce Worker进程:
- JobTracker进程:根据要处理的输入数据量,调配TaskTracker进程启动相应数量的Map和Reduce进程任务,其中的JobScheduler进程管理整个作业生命周期的任务调度和监控。这是Hadoop集群的常驻进程,需要注意的是,JobTracker进程在整个Hadoop集群全局唯一。
- TaskTracker进程:负责启动和管理 Map Worker进程以及Reduce Worker进程。因为需要每个数据块都有对应的 map 函数,TaskTracker进程通常和HDFS的DataNode进程启动在同一个服务器。也就是说,Hadoop 集群中绝大多数服务器同时运行DataNode进程和TaskTracker进程。
- Map/Reduce Worker进程:也就是用户编写的Map和Reduce任务进程,分别在对应的DataNode节点中执行Map任务和Reduce任务的工作,其中Map进程会有很多个,但是Reduce进程一般只有一个。
以一个MapReduce任务为例,以下是核心的工作流程:
- 应用进程JobClient将用户作业JAR包存储在HDFS中,将来这些JAR包会分发给Hadoop集群中的服务器执行MapReduce计算。
- 应用程序提交Job作业给 JobTracker。
- JobTracker根据作业调度策略创建JobInProcess树,每个作业都会有一个自己的JobInProcess树。
- JobInProcess根据输入数据分片数目(通常情况就是数据块的数目)和设置的 Reduce 数目创建相应数量的 TaskInProcess。
- TaskTracker进程和JobTracker进程进行定时通信。
- 如果TaskTracker有空闲的计算资源(有空闲 CPU 核心),JobTracker就会给它分配任务。分配任务的时候会根据TaskTracker的服务器名字匹配在同一台机器上的数据块计算任务给它,使启动的计算任务正好处理本机上的数据。
- TaskTracker收到任务后根据任务类型(是 Map 还是 Reduce)和任务参数(作业 JAR 包路径、输入数据文件路径、要处理的数据在文件中的起始位置和偏移量、数据块多个备份的 DataNode 主机名等),启动相应的Map或者Reduce Wokrer进程。
- Map或者Reduce Worker进程启动后,检查本地是否有要执行任务的JAR包文件,如果没有,就去 HDFS上下载,然后加载Map或者Reduce代码开始执行。
- 如果是Map进程,从HDFS读取数据(通常要读取的数据块正好存储在本机);如果是Reduce进程,将结果数据写出到HDFS。通过这样一个计算旅程,MapReduce 可以将大数据作业计算任务分布在整个Hadoop集群中运行,每个Map计算任务要处理的数据通常都能从本地磁盘上读取到。
3.MapReduce核心组件原理
先回顾一下Map和Reduce任务的执行流程:
MapReduce 的大概的工作流程如上图所示,其中方块表示slave节点,所以这里有 3 个slave节点。 mapper 运行在 3 个 slave节点上,而 reducer 在任意一个slave 运行,上图为了简单起见,把 reducer 进程画在一个方块里,看起来是运行在一个不同的机器上,其实它是在 mapper 的节点上运行的。
Map 任务是 MapReduce 作业的第一个执行阶段。默认情况下,一个 mapper 每次处理的分片数据(split)都是一个 HDFS 数据块,mapper 的输出数据会被写到本地机器的磁盘上。mapper 跑完之后,mapper 输出的结果数据会到 reducer 节点,即运行 reducer 的机器。
Reducer 是 MapReduce 作业的 第二个执行阶段。它的计算结果将会直接落地到 HDFS。
在默认情况下,一个 slave 每次可以跑 2 个 mapper(可以根据需要调高这个值),而 slave 同时能跑多少个 mapper 取决于很多因素,比如,机器的硬件配置,HDFS 块大小等。所以建议不要把这个值调太高,因为这会降低 MapReduce 运行性能。
MapReduce 的 Mapper 会把它的输出结果写到本地磁盘。这个输出结果是临时数据,也叫做中间输出结果。所有的 mapper 都会把输出数据写到本地磁盘。mapper 执行完之后,mapper 输出的数据会从 mapper 节点传输到 reducer 节点,这个过程被称为 shuffle。
Reducer 也是运行在集群的任意一个 datanode 的。所有 mapper 的输出数据都会到 reducer。这些来自不同 mapper 的输出数据会被合并,并作为 reducer 的输入数据。这些合并后的数据还是存储在 mapper 所在节点的磁盘的。Reducer 是 MapReduce 框架提供的另一个你能实现自己业务逻辑的接口,通常我们会在 Reducer 做数据聚合,相加等操作。因此,Reducer 会把最终结果数据写到 HDFS。
map 和 reduce 是执行 MapReduce 作业的两个数据处理阶段。所有 mapper 执行完之后,reducer 才能开始执行。
MapReduce的核心组件完成了Map Worker读取HDFS输入数据格式化、Map临时结果落盘Shuffle到Reduce Worker等核心操作,是Map-Reduce计算过程的核心完成类,下面是以核心类为维度,看一次MapReduce计算过程:
-
输入文件 首先,MapReduce 任务的目的是处理数据,那数据从哪里来?一般一个 MapReduce 任务的输入数据是来自于 HDFS 文件,这里的数据文件就叫做 MapReduce 任务的输入文件,而 HDFS 上文件的格式多种多样,比如有文本文件,二进制文件等。 -
InputFormat InputFormat 是 MapReduce 框架的一个类,它对输入文件进行分割和读取,并创建数据分片 InputSplit。 -
InputSplit InputSplit 对象即数据分片对象,由 InputFormat 生成的,一个数据分片由一个 Mapper 来处理,数据分片是逻辑上的划分,并非物理分割。每一个分片都会相应创建一个 map 任务,因此,map 任务的数量等于分片的数量,即有多少个分片就有多少个 map 任务。分片会被划分成记录,并且每个记录都会被对应 mapper 处理。 -
RecordReader 它会跟 InputSplit 交互,并把数据转换成适合 mapper 读取的键值对(key-value pair)记录。默认情况下,它用的是 TextInputFormat 类来做转换。RecordReader 与 InputSplit 交互一直到文件读取完成。它会给文件的每一行数据分配一个字节偏移量(byte offset)作为唯一编号。后续这些键值对将被发送给 mapper 做进一步处理。 -
Mapper 它负责处理每一个来自 RecordReader 的记录,并生成新的键值对数据,这些 Mapper 新生成的键值对跟输入键值对是不一样的。Mapper 的输出也就是我们前面说的中间结果将会被写到本地磁盘。Mapper 的输出数据并不是存储在 HDFS 的,因为这是临时数据,如果把临时数据写到 HDFS ,将造成不必要的复制,会导致 map 任务性能低下。Mapper 的输出数据被传输给 Combiner 做下一步处理。 -
Combiner combiner 其实是一种 reduce 操作。它会对 mapper 的输出数据做本地聚合,也就是说它是在输出数据的 mapper 所在的机器上执行的。主要为了减少 mapper 和 reducer 之间的数据传输。combiner 执行完成之后,它的输出结果就会被传输到 partitioner 做下一步处理。 -
Partitioner 如果一个 MapReduce 作业在 reduce 阶段有多个 reducer 任务参与,才会有 Partitioner 这一步,即数据分区。如果只有一个 reducer 任务,Partitioner 是不会执行的,即不会对数据分区。 Partitioner 对来自 combiner 的输出数据分区并排序,其实就是对数据的 key 做哈希运算,具有相同 key 的记录会被分到相同的分区,然后每个分区会被发送给 reducer。 -
Shuffle 和排序 现在,Partitioner 的输出被 shuffle 到 reduce 节点( 这里的 reduce 节点其实就是正常的 slave 节点,由于在上面跑 reduce 任务所以才叫 reduce 节点)。shuffle 是对数据进行跨网络的物理移动,需要消耗网络带宽资源。在所有 mapper 都完成之后,他们的输出数据才会被 shuffle 到 reduce 节点,并且这些 mapper 产生的数据会被合并和排序,然后作为 reduce 阶段的输入数据。 -
Reducer 在 reduce 阶段,它把 mapper 输出的键值对数据作为输入,然后对每个键值对数据记录应用 reducer 函数并输出结果。reducer 的输出数据是 MapReduce 作业的最终计算结果,它会被存储到 HDFS。 -
RecordWrite 它负责把来自 Reducer 输出的键值对数据写到输出文件。 -
OutputFormat RecordWriter 将 Reducer 输出的键值对写入输出文件的方式由 OutputFormat 决定。OutputFormat 是由 Hadoop 提供的用于把数据写到 HDFS 或者本地磁盘的接口。因此,reducer 的最终输出数据是由 Outputformat 实例负责写入到 HDFS 的。
4.MapReduce本地化计算
在MapReduce的设计思想中,认为与其移动大量的数据到计算程序所处位置进行计算,不如把计算程序分发到大数据所在位置进行本地计算,也就是“移动计算比移动数据便宜”,这就是MapReduce本地化计算的含义,通过分发计算程序到数据所在地进行本地计算,能够大大地减少IO时间,因为避免了数据的移动,降低网络开销,增加系统的整体吞吐量。
在 Hadoop 中,数据集是存储在 HDFS 的。而数据集会被切割成很多块,这些块会被存储在 Hadoop 集群的不同节点。在执行 MapReduce 作业的时候,NameNode 把这些 MapReduce 程序的代码发送给与该程序代码将要处理的数据所在 的 datanode 上。这个过程可以结合下面的图来理解。
为了达到数据本地化的所有优势,系统架构需要满足下面的条件:
- 首先,集群应该具备合理的拓扑结构。Hadoop 代码必须具有读取数据位置的能力。
- 其次,Hadoop 必须对集群内执行任务的节点的拓扑结构具有感知能力。而且 Hadoop 必须知道数据存储的具体位置。
数据本地化有下面几种不同情况:
- Data Local
数据和计算代码在同一个节点内,在这种情况下,计算和数据距离非常近,计算操作数据不需要经过网络,这是最优的情况。 - Intra-Rack
由于资源有限,mapper 不可能总是在同一个节点执行。这种情况下,mapper 在同一个机架的不同节点运行是相对比较好的。 - Inter-Rack
有时由于资源限制,不能在同一个机架内的不同节点执行 mapper,所以只能在不同机架的不同节点执行 mapper。这种情况相对来说是性能最差的情况。
|