日常使用的Hive、Pig、Cascading底层计算都是由MapReduce完成 用户也可以不使用这些工具,直接使用MapReduce执行任务,MapReduce会提供JAVA API,也提供Hadoop Streaming和Hadoop Pipes来支持其他语言的开发
该系列文章主要介绍MapReduce的设计原理: 第一篇要介绍MapReduce的下层依赖(包括HDFS存储架构和Hadoop提供的RPC功能)以及MapReduce计算框架和流程的简要介绍 第二篇主要介绍MapReduce的实现原理,包括MapReduce的各个组件和作业的运行过程
本文主要是《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》书籍的笔记,但是这本书不太推荐,感觉和其他推荐也很多的计算机书籍不是一个级别的(但是是leader推荐读的,跟工作内容也比较相关,就读了,之后可以再看看有没有更好的可以替代的书)
- 错别字比较多,有时候已经到影响阅读了
- 语句前后主语不一致,会有句子理解有歧义
- 语言不太严谨,一些首次出现的概念没有介绍
- 一些章节会给人“知道其然,不知道其所以然”的感觉,章节设计和逻辑讲解没有太顺
- 总之,就是读的时候还需要配合其他书籍或者博客一起读才能明白
一、HDFS存储
HDFS是master-slave(NameNode-DataNode)架构:
-
Client:用户使用HDFS的接口 -
NameNode:一个HDFS集群只有一个NameNode
- 存储全部文件元数据,包括fsimage(元数据镜像文件)和editlog(文件改动日志)
- 监控DataNode的健康状态,DataNode宕机后及时备份
-
Secondary NameNode:定期合并fsimage和editlog,做checkpoint -
DataNode:一个Slave节点有一个DataNode
- 负责实际的数据存储:将数据切分为多个block,一个block 64M,一个blcok以流水线的方式写到若干个DataNode上,切割过程是用户透明的,split下文会介绍,与map task相关
二、MapReduce计算
1. MapReduce总体架构
MapReduce是Master-Slave(JobTracker-TaskTracker)架构
- Client:用户提交作业,监控作业,一个作业是一个Job
- JobTracker
- 监控TaskTracker的健康状态,出现问题后,及时把作业转移至其他节点
- 跟踪TaskTracker的资源使用和作业进度,告诉TaskScheduler
- TaskScheduler负责调度任务,是一个可插拔模块
- TaskTracker
- 真正执行任务的节点,接受JobTracker命令,杀死/启动节点
- 一个TaskTracker上有多个slot(CPU、内存资源)
- MapTask
- 从HDFS读取input block,每个Map Task读取一个split(DataNode存储图中出现过)的数据
- 对每一个split执行用户定义的map函数,执行结果分成若干个partition,存储至本地磁盘,一个partition被一个reduce task处理
- ReduceTask
- 读取MapTask的计算结果(Shuffle阶段)
- 将读取的key-value排序(Sort阶段)
- 执行用户定义的reduce函数,将结果存储至HDFS上(Reduce阶段)
2. MapReduce计算模型
使用MapReduce,需要配置5个组件,下个小节会详细介绍
- InputFormat:输入格式,数据分片逻辑
- Mapper:对应图的map task
- Partitioner:对应图的group by sorting 对应哪个key-value发给哪个reduce task
- Reducer:对应图的reduce task
- OutputFormat:输出格式
- Canbiner:优化MapReduce性能 不是必备组件
思考
-
如何使用MapReduce解决top-k问题:查询出现频率最高的k个单词 将数据集分为多个数据块,每个数据块送入一个map task map task1:统计各个单词出现的频率 reduce task1:汇总各个单词出现的频率 – 每个map task2对应一个reduce task1 map task2:输出出现频率最高的k个单词 reduce task2:负责把多个map task输出的单词汇总再排序输出前k个数据 -
如何使用MapReduce解决K-means问题:将N个对象划分为K个聚类 随机选择K个点作为聚类的中心点 map task:计算每个节点到K个中心点的距离,并选择最近的中心点作为其聚类一员 reduce task:计算每个聚类的平均坐标,作为K个中心点 重复map-reduce过程直至到达重复上限或数据点到中心点的距离和最小 -
如何使用MapReduce解决Fibonacci数值计算 无法解决
结论 无法将该问题划分成若干个不相关的子问题,就无法使用MapReduce解决 map task和reduce task的区别是什么?
下面将详细介绍这5个组件
3. MapReduce编程模型
下面将详细MapReduce编程需要5个组件:InputFormat、Mapper、Partitioner、Reducer和OutputFormat
-
InputFormat 主要功能:定义输入数据按照什么逻辑切分为split,以确定split的个数和map task的个数(刷一个split对应一个map task),同时实现为map函数提供输入,给定某个split,要可以返回这个split对应的数据
-
Mapper 包括初始化、Map操作和清理三部分。初始化可以使用jobConf的初始化参数对Mapper初始化;map()输入key-value,输出key-value和reporter;通过Closeable接口对Mapper进行清理 -
Partitioner 主要作用:对Mapper产生的中间结果分片,将同一组的数据送入同一个Reducer(不一定一个Reducer只处理一个分片),分片结果会影响到Reducer的负载均衡,MapReduce提供了两种分片方式:
- HashPartitioner:由key的hash值决定数据划分到哪一个partition
- TotalOrderPartitioner:由key值大小决定划分到哪一个partition,将key值从小到大“均分”到多个partition,到做到完全均分必须要知道所有key值分布,所以采用抽样的方法查看key值分布,抽样方法有IntercalSampler、RandomSampler、SplitSampler等,将采样数据n等分,找出n等分点,每个Mapper的输出结果按n等分点分片,每个Reducer对分片再进行内部排序,最后就可以做到全排序
-
Reducer 和Mapper一样,其数目由用户通过参数mapred.reduce.tasks(默认数目为1)指定
以上是一个完整的job(MapReduce作业)介绍,下面介绍扩展,复杂的DAG(有向无环图)情况
- 如果是多个有依赖关系的job可以使用JobControl一次性提交多个job,JobControl会按照job之间依赖关系进行作业调度
- 允许一个Map或Reduce阶段存在多个Mapper(不能存在多个Reducer),可以使用ChainMapper或ChainReducer
三、Hadoop RPC
Hadoop RPC采用基于事件驱动的Reactor设计模式,实现使用到NIO、反射+动态代理机制、Java网络编程等
1. Hadoop RPC使用方法
这部分没太懂,待补充
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
public static final long versionID=1L;
String echo(String value)throws IOException;
int add(int v1,int v2)throws IOException;
}
public static class ClientProtocolImpl implements ClientProtocol {
public long getProtocolVersion(String protocol, long clientVersion) {
return ClientProtocol.versionID;
}
public String echo(String value)throws IOException {
return value;
}
public int add(int v1,int v2)throws IOException {
return v1+v2;
}
}
server = RPC.getServer(new ClientProtocolImpl(), serverHost, serverPort, numHandlers, false, conf);
server.start();
proxy = (ClientProtocol)RPC.getProxy(ClientProtocol.class, ClientProtocol.versionID, addr, conf);
int result = proxy.add(5,6);
String echoResult = proxy.echo("result");
在Hadoop中,JobTracker和NameNode分别是MapReduce和HDFS两个子系统中的RPC Server
2. MapReduce中的六大通信协议
- JobSubmissionProtocol:Client(用户)与JobTracker的通信协议,用户提交作业、监控作业运行情况
public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts)throws IOException;
setJobPriority()
killJob()
killTask()
public ClusterStatus getClusterStatus(boolean detailed)throws IOException;
public JobStatus getJobStatus(JobID jobid)throws IOException;
public JobStatus[] getAllJobs()throws IOException;
- InterTrackerProtocol:TaskTracker与JobTracker的通信协议,汇报节点使用情况和任务运行情况
HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted,boolean initialContact,boolean acceptNewTasks,short responseId) throws IOException;
TaskCompletionEvent[]getTaskCompletionEvents(JobID jobid, int fromEventId ,int maxEvents)throws IOException;
public String getSystemDir();
public String getBuildVersion()throws IOException;
- TaskUmbilicalProtocol:Task与TaskTracker间的通信协议,Task是同节点TaskTracker的子进程
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus,JvmContext jvmContext)throws IOException, InterruptedException;
boolean ping(TaskAttemptID taskid, JvmContext jvmContext)throws IOException;
- RefreshUserMappingProtocol:Client(管理员)更新用户-用户组映射关系
- RefreshAuthorizationPolicyProtocol:Client(管理员)更新MapReduce服务级别的访问控制列表,如哪些用户可以使用JobSubmissionProtocol协议
- AdminOperationsProtocol:Client(管理员)更新队列访问控制列表(哪些用户可以向哪些队列提交任务)和节点列表(设置节点白名单和黑名单)
|