概述:
主要内容:了解Hadoop、MapReduce、HDFS
介绍一下这三者:
Hadoop的核心就是HDFS和MapReduce
配置文件详解:
- fs.default.name :hdfs文件系统的URL,主机是namenode的主机名称或IP地址,端口号是namenode监听RPC的端口,默认端口8020
- dfs.name.dir 指定namenode的存储目录
- dfs.data.dir 指定datanode的存储目录
- dfs.data.dir 指定namenode的检查点目录
MapReuduce的关键属性:
- jobtracker的属性一般是:主机:端口号(默认8021)
Hadoop的守护进程的地址和端口
Hadoop一般同时运行RPC和HTTP两个服务器,RPC服务器支持守护进程间的通信,HTTP服务器则提供与用户交互的Web页面 需要给各个服务器配置网络地址和端口号
- HTTP服务器属性
计算机集群的结构:
普通的文件系统只需要单个计算机节点就可以完成文件的存储和处理,单个计算机节点由处理器、内存、高速缓存和本地磁盘构成。
分布式文件系统把文件分布存储到多个计算机节点上,成千上万 的计算机节点构成计算机集群
集群中的计算机节点存放在 机架(Rack)上,每个机架可以存放8~64个节点,同一机架上的不同 节点之间通过网络互连(常采用吉比特以太网),多个不同机架之间采用另一级网络或交换机互连。
文件系统结构
文件块会被复制为多个副 本,存储在不同的节点上,而且存储同一文件块的不同副本的各个节 点会分布在不同的机架上,这样,在单个节点出现故障时,就可以快 速调用副本重启单个节点上的计算过程,而不用重启整个计算过程, 整个机架出现故障时也不会丢失所有文件块
体系结构:
MapReduce
介绍一些mapreduce的特点:
-
一次写入多次读取: MapRuduce实际上有点暴力,每个查询需要处理整个数据集(至少很大一部分)。反过来想,这也正是它的优势所在。 -
MapReduce是一个批量查询处理器,能够在合理的时间范围内处理针对整个数据集的即时查询。以前的数据存储在磁盘上,要读取数据需要取寻址,会耗费大量时间。
既然说到大数据的存储,那为什么不用关系型数据库啊?MYSQL的DBMS不是挺好的么?
其实数据读取(select)的时间开销主要在寻址,读取大量的数据集时间开销不可避免的增加,这种流式读取的速率,主要取决于传输速率。
补充一下什么是流式读取(我的个人理解):
场景:首先你写一个mysql语句 select stu.name from Students ,执行 , 然后mysql的内核就去数据库里查询数据,将查询的结果返回存储在内存里面,然后读取出来。
- 直接全部返回结果,这应该是正常的操作。那么如果现在数据量非常非常大,你觉得能正常的一次性返回么?是,如果你这样执行也不会报错,我猜你电脑八成会死机。为什么呢,内存会溢出,你的肚皮只有那么大,吃饱了就会吃不下,然后就歇菜了!
- 将数据切成一小块一小块的,然后分批次的返回给内存,也就是说,饭要一口一口吃! 流式数据访问就像勤快的小弟,来了一点数据,就立马处理掉,立马分发到各个存储节点来响应分析、查询。
- 与之相反的是,非流式数据访问,就像是职场混了多年的老油条,来了一点数据,懒得处理,等堆成一堆,再一起处理。等堆成一堆,处理完,再分发到各个存储节点,响应分析、查询。
- 最简单的例子:
回到MapReduce,所以啊如果是处理小规模的数据关系型数据库没毛病very good ,但是要操作大规模数据,MapReduce是爸爸,因为爸爸可以排序、组合重建数据库
对比一下:
-
MapReduce对非结构化和半结构化数据非常有效 结构化数据:具有既定格式的实体化数据,例如 XML文件 半结构化数据:可能有格式,但经常被忽略。比如excel的具有单元格的网格结构,但是单元格里面的数据格式是任意的 非结构化数据:没有内部结构,比如纯文本和图像数据 -
Web服务器日志:每次都会记录客户端主机的数据操作记录 -
MapReduce是一个可伸缩的编程模型。码农只需要写两个函数,map函数和reduce函数(每个函数定义了一个键值对集合到另一个键值对集合的映射),且无需关注数据集和所用集群的大小。举个例子,输入的数据量是原来的两倍,作业花费的时间也是两倍;但如果是花费的集群是原来的两倍,作业时间不变! -
数据本地化:MapReduce会尽量在节点上存储数据,以实现数据的本地快速访问,它通过显式网络拓扑结构尽力保留网络带宽。 -
无共享:码农无需考虑系统部分失效问题,mapreduce会自动检测到失败的map和reduce任务并且让正常的机器去重新运行这些任务。
HDFS
- 管理网络中跨多台计算机存储的文件系统成为分布式文件系统,Hadoop Distributed FileSystem
特点:
- 超大文件
- 流式数据访问
- 可能高时间延迟的数据访问(高数据吞吐量)
- 大量的小文件
- 单用户写入
数据块:
每个磁盘都有默认的数据块大小,这是磁盘进行数据读写的最小单位。 文件系统一般为几千字节 ,磁盘块一般为512字节,HDFS默认为64MB。
为什么HDFS的块这么大?
- 最小化存储开销
- HDFS的文件被划分为块大小的多个分块,HDFS中小于一个块大小的文件不会占满整个块的空间;
- 一个文件的大小可以大于网络中任意一个磁盘的容量,因为文件的所有的块可以存储在集群上任意一个磁盘上进行存储
- HDFS的存储单元使用块进行存储,而不是文件。便于简化存储管理
块抽象的好处:
● 支持大规模文件存储。文件以块为单位进行存储,一个大规模 文件可以被分拆成若干个文件块,不同的文件块可以被分发到不同的 节点上,因此一个文件的大小不会受到单个节点的存储容量的限制, 可以远远大于网络中任意节点的存储容量。 ● 简化系统设计。首先,大大简化了存储管理,因为文件块大小 是固定的,这样就可以很容易计算出一个节点可以存储多少文件块; 其次,方便了元数据的管理,元数据不需要和文件块一起存储,可以 由其他系统负责管理元数据。 ● 适合数据备份。每个文件块都可以冗余存储到多个节点上,大 大提高了系统的容错性和可用性。
Nodes
NameNode:
-
管理文件系统的命名空间,它维护着文件系统树及整棵树内所有的文件和目录 信息以两个文件形式永久保存在本地磁盘:命名空间镜像文件和编辑日志文件即FsImage和EditLog
- FsImage用于维护文件系统树以及文件树中所有的 文件和文件夹的元数据
- 操作日志文件EditLog中记录了所有针对文件 的创建、删除、重命名等操作
-
记录着每个文件中各个块所在的数据节点的信息,但并不保存块的位置信息 -
名称节点在启动时,会将 FsImage 的内容加载到内存当中,然后执 行 EditLog 文件中的各项操作,使得内存中的元数据保持最新;名称节点启动成功并进入正常运行状态以后,HDFS中的更新操作 都会被写入到 EditLog,而不是直接写入 FsImage
客户端通过namenode和datanode的交互来访问整个文件系统
Secondary NameNode:
有效解决EditLog逐渐变大带来的问题,如果 EditLog很大,就会导致整个过程变得非常缓慢,使得名称节点在启动 过程中长期处于“安全模式”,无法正常对外提供写操作,影响了用户 的使用
- 可以完成EditLog与 FsImage的合并操作,减小EditLog文件大小,缩短名称节点重启时间
- 可以作为名称节点的“检查点”,保存名称节点中的元数据信息
- EditLog与FsImage的合并操作,用最新的FsImage文件去替换旧的FsImage文 件,同时用 EditLog.new 文件去替换 EditLog 文件
- 周期性地 备份名称节点中的元数据信息,当名称节点发生故障时,就可以用第 二名称节点中记录的元数据信息进行系统恢复
- ,如果名称节点在t 1 时刻和t 2 时刻期间发生故障,系统就会丢失 部分元数据信息,在HDFS的设计中,也并不支持把系统直接切换到第二名称节点,并不能起到“热备份”作用
通信协议
所有的HDFS通信协议都是构建在TCP/IP协议基础 之上的
-
Client -》 NameNode :客户端通过一个可配置的端口向名称节点主动发起TCP连接, 并使用客户端协议与名称节点进行交互 -
NameNode -》 DataNode :使用数据节点协议进行交互 -
Client -》dataNode :客户端与数据节点的交互是通过 RPC(Remote Procedure Call)来实现的。在设计上,NameNode不会主动发起RPC,而是响应来自Client和DataNode的RPC请求
datanode: 文件系统的工作节点,它们根据需要存储并检索数据块(受客户端和namenode调度),并且定期向namenode发送它们所存储的块的列表
- 每个数据节点会周期性地向名称节点发送“心跳”信息,报 告自己的状态,没有按时发送心跳信息的数据节点会被标记为“宕 机”,不会再给它分配任何I/O请求。
容错机制
第一种机制:
备份那些组成文件系统元数据持久状态的文件。namenode在多个文件系统上保存元数据的持久状态,实时同步的。另外,写入一个远程挂载的网络文件系统NFS
通常一个数据块的多个副本会被分布到不同的数据节点上
好处:
- 加快数据传输速度,分块后可并行读取
- 容易检查数据错误
- 保证数据的可靠性。即使某个数据节点出现故障失效,也不
会造成数据丢失。
第二种机制:
运行辅助的namenode,作用是定期通过编辑日志并合并命令空间镜像,以防止编辑日志过大。
数据流
客户端读取 HDFS的数据
客户端写 HDFS的数据
网络拓扑
HDFS默认每个数据节点都是在不同的机架上
HDFS 默认的冗余复制因子是 3,每一个文件块会被同时保存到 3 个地方,其中,有两份副本放在同一个机架的不同机器上面,第三个 副本放在不同机架的机器上面,这样既可以保证机架发生异常时的数据恢复,也可以提高数据读写性能
- (1)如果是在集群内发起写操作请求,则把第一个副本放置在发 起写操作请求的数据节点上,实现就近写入数据。如果是来自集群外部的写操作请求,则从集群内部挑选一台磁盘不太满、CPU不太忙的数据节点,作为第一个副本的存放地。
- (2)第二个副本会被放置在与第一个副本不同的机架的数据节点上。
- (3)第三个副本会被放置在与第一个副本相同的机架的其他节点上。
- (4)如果还有更多的副本,则继续从集群中随机选择数据节点进行存放
读取优先级:
- 同一节点的进程
- 同一机架的不同节点
- 同一数据中心中不同机架的节点
- 不同数据中心中节点
数据完整性
-
hdfs会对写入的文件计算循环冗余校验和,并在验证读取数据时进行验证 -
datanode负责验证收到的数据后存储数据以及其校验和 -
客户端从datanode读取数据时,也验证校验和,每个datanode都持久保存有一个用于验证的校验和日志 -
hdfs存储着每个数据块的副本,因此它可以通过复制完好的数据副本来修复损坏的数据块。
HDFS基本操作
-
显示块信息:hadoop fsck / -files -blocks -
本地文件复制到hdfs:hadoop fs -copyFromLocal input/docs/quangle.txt hdfs://local/user/tom/quangle.txt -
主机的url在core-site中指定 -
检查文件是否一致:md5 input/docs/quangle.txt quangle.txt hadoop fs [genericOptions][commandOptions] -
进入离开安全模式: hadoop dfsadmin -safemode get hadoop dfsadmin -safemode wait/enter/leave -
关于dfsadmin: -
fsck工具:检测hadoop文件的健康状况 hadoop fsck /
- -delete :删除
- -move : 移动到/lost+found 目录下
- -files :显示第一行信息
- -blocks:选项描述各个文件中各个块的信息
- -racks:显示各个块的机架位置和datanode的地址
-
开启均衡器:start-balance.sh
其他具体如下。
● hadoop fs -ls
。显示
指定的文件的详细信息。
● hadoop fs -ls -R
。ls命令的递归版本。
● hadoop fs -cat
。将
指定的文件的内容输出到标准输
出(stdout)。
● hadoop fs -chgrp [-R]group
。将
指定的文件所属的组 改为group,使用-R对
指定的文件夹内的文件进行递归操作。这
个命令只适用于超级用户。
● hadoop fs -chown [-R][owner][: [group]]
。改变
指定的 文件的拥有者,-R用于递归改变文件夹内的文件的拥有者。这个命令
只适用于超级用户。
● hadoop fs -chmod [-R]
。将
指定的文件的权限
更改为。这个命令只适用于超级用户和文件的所有者。
● hadoop fs -tail [-f]
。将
指定的文件最后1KB的内容输 出到标准输出(stdout)上,-f选项用于持续检测新添加到文件中的内
容。
● hadoop fs -stat [format]
。以指定的格式返回
指定的文
件的相关信息。当不指定format的时候,返回文件
的创建日期。
● hadoop fs -touchz
。创建一个
指定的空文件。
● hadoop fs -mkdir [-p]。创建指定的一个或多个文件
夹,-p选项用于递归创建子文件夹。
● hadoop fs -copyFromLocal 。 将 本 地 源 文 件 复制到路径指定的文件或文件夹中。
● hadoop fs -copyToLocal [-ignorecrc][-crc]。将目 标文件复制到本地文件文件夹中,可用-ignorecrc选项复制CRC校验失败的文件,使用-crc选项复制文件以及CRC信息。
● hadoop fs -cp 。将文件从源路径复制到目标路径 。● hadoop fs -du
。显示
指定的文件或文件夹中所有文件的大小。
● hadoop fs -expunge。清空回收站,请参考HDFS官方文档以获取
更多关于回收站特性的信息。
● hadoop fs -get [-ignorecrc][-crc]。复制指定的 文件到本地文件系统指定的文件或文件夹,可用-ignorecrc选项复制CRC校验失败的文件,使用-crc选项复制文件以及CRC信息。
● hadoop fs -getmerge [-nl]。对指定的源目录中 的所有文件进行合并,写入指定的本地文件。-nl是可选的,用于指定在每个文件结尾添加一个换行符。
● hadoop fs -put 。从本地文件系统中复制 指定的单个或多个源文件到指定的目标文件系统中,也支持从标准输入(stdin)中读取输入写入目标文件系统。
● hadoop fs -moveFromLocal 。与put命令功能相同,但是文件上传结束后会从本地文件系统中删除指定的文件。
● hadoop fs -mv 。将文件从源路径移动到目标路 径。 ● hadoop fs -rm
。删除
指定的文件,只删除非空目录和文件。
● hadoop fs -rm -r
。删除
指定的文件夹及其下的所有文件,-r选项表示递归删除子目录。
● hadoop fs -setrep [-R]
。改变
指定的文件的副本系数,-R选项用于递归改变目录下所有文件的副本系数。
● hadoop fs -test -[ezd]
。检查
指定的文件或文件夹的相关信息。不同选项的作用如下。
* -e检查文件是否存在,如果存在则返回0,否则返回1。
* -z检查文件是否是0字节,如果是则返回0,否则返回1。
* -d如果路径是个目录,则返回1,否则返回0。
● hadoop fs -text
。将
指定的文件输出为文本格式,文件的格式允许是zip和TextRecordInputStream等。
作业历史
作业历史包括已完成作业的事件和配置信息。 作业历史可以用来实现jobtrack本地文件系统的history子目录中,设置在hadoop.job.history.location,历史文件保存30天
- 作业输出目录——logs/history子目录为用户存放第二个备份。
作业页面:
运行期间,可以在作业页面监视作业进度,页面会定期更新。
摘要信息:
- 显示map和reduce的进度
- Num Task 显示作业map和reduce的总数
- 其他列显示的是这些任务的状态:Pending(等待运行),Running,Complete(成功完成),Killed/Failed(失败)
- 最后一列显示的是一个作业的所有map和reduce任务中失败和中止的task attempt 总数
结果
每个reducer产生一个输出文件,假设目录中有30个部分文件,命名为part-00000-part-00029
将部分文件合成为一个单独的文件:hadoop fs -getmerge temp temp-local
- reducer输入的文件是无序的,因为是hash partitioner,所以需要排序输出
sort temp-local | tail
如果输出文件比较小,另外一种方式:hadoop fs -cat temp/*
Hadoop日志
作业调优
如何运行自己编写的MapReduce程序:
提示:这里可以添加要学的内容
例如:
- map函数
- reduce函数
- main函数
- 打jar包
上传hdfs:
hadoop dfs -put test.jar /
在hdfs上运行jar:
hadoop jar test.jar test.test1Driver
hadoop jar 自己打的jar包名称/路径 运行程序的主类(Driver类)路径
打包上传教程
MapReduce的工作流:
如何将数据处理问题转化为MapReduce模型
例如:
用mapreduce来分解:
- 计算每对station-date的每日最高气温 得到 (station-data , 最高气温)
- 计算每个station-day-station键的每日最高气温的均值
把station-date里面的年份丢掉,变成(station-day-month,最高气温),然后reducer去计算每个station-day-month键的均值
运行独立的作业
如何让作业顺序的执行?是否有一个线性的作业链或者一个更复杂的作业有向无环图
线性链表:
JobClient.runJob(conf1)
JobClient.runJob(conf2)
如果一个作业失败,runJob就会抛出一个IOException,这样一来管道中后面的作业就无法执行了。
更复杂的结构使用org.apache.hadoop.mapred.jobcontrol
MapReduce作业运行机制:
一行代码就可以运行MapReduce作业:JobClient.runJob(conf)
整个过程:
- 客户端:提交MapReduce作业
- jobtracker:协调作用的运行。jobtracker是一个java的应用,它的主类是JobTracker
- tsktracker:运行作业划分后的任务。java应用程序
- 分布式文件系统(一般是HDFS),用来在其他实体间共享作业文件。
作业的提交
Jobclient的runJob方法用于新建jobclient实例并且调用submitJob()的便捷方式。
提交作业后,runjob()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告给控制台。作业完成后,如果成功,就显示作业计数器;如果失败,导致作业失败的错误被记录到控制台。
submitJob()所实现的作业提交过程如下:
- mapred.submit.replication 控制作用JAR的副本数,默认为10
作业的初始化
当JobTracker接收到对其submitJob方法的调用后,会把此调用放入一个内部队列当中,交由作业调度器进行调度,并对其初始化。
- 初始化包括创建一个表示正在运行作业的对象—用于封装任务和记录信息,以便跟踪任务的状态和进程。
创建任务运行列表:
- 作业调度器首先从共享文件系统中获取JobClient以计算好的输入分片信息
- 为每个分片创建一个map任务
- 调度器创建相应数量的要运行的reduce任务,任务在此时被指定ID
- 创建的任务的数量有JobConf的mapred.reduce.task 属性决定,它是用setNumReduceTasks()方法来设置的
任务的分配
- tasktracker运行一个简单的循环来定期发送“心跳”给jobtracker.
- 心跳告知jobtracker,tasktracker是否还存活,同时也充当两者之间的消息通道。
- 作为心跳的一部分,tasktracker会指明它是否已经准备好运行新的任务;如果是,jobtracker会为分配一个它分配一个任务,并使用心跳的返回值与tasktracker进行通信
在jobtracker为tasktracker选择任务之前,jobtracker必须选定任务所在的作业 如何选任务?(有调度算法)默认的方法是简单维护一个作业优先级列表,一旦选择好作业,jobtracker就可以为该作业选择一个任务。
对于map任务和reduce任务,tasktracker有固定数量的任务槽
Example: tasktracker可能可以同时运行两个map和两个reduce。准确数量由tasktracker核的数量核内存大小决定。
默认:
- 调度器在处理reduce任务槽之前,会填满空闲的map任务槽,因此,tasktracker至少由一个空闲的map任务槽,jobtracker会为它选择一个map任务,否则选择一个reduce任务。
- 选择reduce任务,直接从reduce任务列表里面拿下来一个就行
- 选择map任务,jobtracker要考虑tasktracker网络位置,优先级:
任务执行
假设tasktracker已经被分配了一个任务,下一步是运行该任务。
- 第一步,通过从共享文件系统把作业的JAR文件复制到tasktracker所在的文件系统,从而实现作业的JAR文件本地化。同时,tasktracker将应用文件中所需要的所有文件从分布式缓存复制到本地磁盘
- tasktracker为任务新建一个本地的工作目录,并且把JAR文件中的内容解压到这个文件夹里面
- tasktracker新建一个TaskRunner实例来运行该任务
- TaskRunner启动一个新的JVM来运行每个任务,以便用户定义的map核reduce函数的任何软件问题都不会影响到tasktracker;但是在不同的任务之间重用JVM还是有可能的
- 子进程通过umbilical接口与父进程进行通信,任务的子进程每隔几秒便告知父进程它的进度,直到任务完成
Streaming和Pipes
streaming和Pipes都运行特殊的map和reduce任务,目的是运行用户提供的可执行程序,并与之通信
Streaming中:
Pipes:
- 监听套接字socket,发送其环境中的一个端口号给C++进程,如此一来,在开始时候,C++进程即可建立一个与其父节点JAva Pipes任务的持久化套接字连接
进度和状态的更新
MapReduce作业是长时间运行的批量作业,运行时间范围从数秒到数小时。得知作业的进展是很重要的。
每个作业都有一个状态:
- 作业或任务的状态
- map和reduce的进度
- 作业计数器的值
- 状态消息或描述
状态信息与客户端的通信
进度追踪
map任务:任务进度是已处理输入所占比例 reduce任务:情况有点复杂,但是系统仍然会估计已处理reduce任务的比例
MapReduce中进度的组成 进度并不是总是可测量的,但是无论如何Hadoop有个任务正在运行。比如输出记录的任务也可以表示成进度,尽管不是百分比的形式
进度的所有操作:
- 读入一条输入记录
- 写入一条输出记录
- 在一个Reporter中设置描述状态
- 增加计数器(使用Reporter的incrCounter()方法)
- 调用Reporter
- 的progress任务
如果任务报告了进度,便会设置一个标志以表明状态变化将被发送到tasktracker,有一个独立的线程,每隔三秒检查一次此标志,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒发送心跳到jobtracker(5秒是最小值),心跳间隔实际上是由集群的大小来决定的,对于一个更大的集群,间隔会长一些 tasktracker运行的所有任务的状态都会在调用中被发送至jobtracker,计数器间隔少于5秒,因为计数器占用的带宽相对较高
JobClient通过每秒查询jobtracker来接收最新状态。JobClient的getJob()方法来得到一个RunningJob的实例
作业的完成
jobtracker收到最后一个作业完成,便把作业状态设置为“成功”。JobClient查询状态时,便知道任务已经完成,JobClient打印一条消息告知用户,从runJob返回。
最后jobtracker清空工作的状态,指示tasktracker也清空工作状态。
作业的失败
- 任务失败:提交失败报告,tasktracker启动心跳备用
- task失败:心跳停止发送,jobtracker调度其他的tasktracker来执行任务
- job失败:单点故障
作业调度
- 早期:FIFO调度算法,先进先出策略
- 加入优先级:mapred.job.priority属性或JobClient的setJobPriority()方法来设置优先级,但是在FIFO算法中不支持抢占
- 多用户调度器Fair Scheduler , Capacity Schedule
Fair Schedule
目标:让每个用户公平地共享集群能力 。如果只有一个作业,它会得到集群的全部资源。随着提交的作业越来越多,空闲的任务会以公平共享集群。
- 作业都被放在作业池中,默认情况下,每个用户都有自己的作业池
- 提交作业数超过超过另外一个用户的用户,不会因此获得比后者更多的集群资源,可以用map和reduce任务槽数,来定制作业池的最小容量,还可以设置任务池的权重
- 支持抢占
如何使用?
- 需要将其jar文件放在Hadoop的类路径下,也就是将它从Hadoop的contrib/fairscheduler目录复制到lib目录
- 将mapred.jobtracker.taskSchedule - > org.opache.hadoop.mapred.FairScheduler
Capacity Schedule
- 集群由很多队列组成,这些队列可能是层次结构的,每个队列有一个分配能力。
- 每个队列内部,FIFO调度
Shuffle & Sort
MapReduce确保每个reducer输入都是按键排序的,map的输出传给给reduce作为输入,中间这个过程称为shuffle
map端
map函数产生输出时,并不是简单的将它写入到磁盘! 这个过程有点复杂,先用缓冲的方式写入到内存,并出于效率的考虑进行预排序
- 每个reduce任务都有一个环形内存缓冲区,用于储存任务的输出,默认情况下缓冲区的大小为100MB,此值可以通过io.sort.mb任务来调整。
- 一旦缓冲区内容达到阈值(0.8)以上,一个后台进程就会开始把内容写入spill磁盘
- 写入磁盘过程中,map输出不会停止,如果满了,map将会阻塞
- 写磁盘将按照轮询的方式写到mapred.local.dir属性指定的作业特定子目录中。
过程要点:
- 写入磁盘前,线程会根据数据最终要传送到的reducer把数据划分为相应的分区,每个分区中后台都按键排序
- 内存缓冲区达到溢出阈值后,就新建一个溢出写文件,map任务结束之前,会有多个溢出写文件,这些文件要被合并成一个已分区且已排序的输出文件,默认依次合并10个流
io.sort.factor - 如果指定combiner,并且溢出写次数>3,conbiner就会在输入文件写入磁盘之前执行,conbiner反复执行不影响结果
- conbiner的意义在于使map更加紧凑,写到本地磁盘的数据更少
- 压缩map输出:让磁盘写的更快,节约磁盘空间,
mpred.map.output.compression.codec 为true - map输出文件位于运行map任务的tasktracker的本地磁盘
reduce端
reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件,每个map任务完成的时间不一样,只要有一个map任务完成,reduce任务就开始复制map的输出文件(并行获取map输出,默认5个)
过程要点:
- 复制阶段:先复制map输出文件
- 排序(合并)阶段:合并map输出文件,并维持其顺序排序,循环进行的
假设50个map文件,合并因子(merge factor =10),那么合并进行5趟,每趟合并10个文件,最后形成10个中间文件
实际上没有这么完美的安排!!! 目标是合并最小数量的文件一遍满足最后一趟的合并系数
假设40个文件,运行起来绝对不是4 * 10的安排,而是第一天合并4个文件,随后3趟合并10个文件,在最后一趟(第4趟)4个已经合并的文件会和余下的6个合并,合计10个。总体上看好像是4 * 10
why? 这是优化措施,不改变合并次数的情况下,尽量减少写到磁盘的数据量,因为最后一趟无论如何都要直接合并到reduce(4和6顺带在最后一趟写入,少一次合并IO)
10+10+10+10 = 40 - > IO(40) (4)+10+10+10 +( 6) = 40 - > IO(30)
配置调优
原则:给shuffle过程分配尽量多的提高内存空间
问题:map函数和reduce函数能够得到足够的内存来运行(编写函数时少用内存)
运行map和reduce任务的JVM,内存大小在mapred.child.java.opts 属性设置
- map端,避免多次溢出写磁盘来获得最佳性能
- reduce端,中间数据全部驻留在内存时,就能获得最佳性能(不可能!)
- Hadoop默认使用4KB的缓冲区,
io.file.buffer.percent 中设置
- reduce
任务的执行
推测执行
MR模型将作业分解成任务,然后并行的运行任务使得作业的整体时间少于各个任务执行的时间
任务执行缓慢有很多原因,但是检测具体原因很困难,Hadoop不会去诊断和修复缓慢任务,相反,有个任务比预期的时间慢,它会尽量检测但同时它会启动另一个相同的任务作为备份。
策略:
- 同时启动两个相同的任务,它们会互相竞争,导致推测执行无法进行,集群资源浪费
- 只有在一个作业的所有任务都启动之后,才启动推测执行的任务(落后一点,不要同时),尤其针对已经运行一段时间后且比作业中其他任务慢的任务。一个任务成功完成后,其他的推测执行任务全部中止
- 原任务比推测执行任务快,推测执行任务扔掉;推测执行任务比原任务快,原任务扔掉
推测执行的目的时减少作业的时间,但这是以集群的效率为代价的,推测执行会减少整个的吞吐量,因为冗余任务的执行会减少作业执行时间
任务JVM重用
Hadoop在自己的java虚拟机上运行任务,以区分其他正在运行的任务。每个任务启动一个新的JVM将耗时1秒。对于小任务1s微不足道,大量短map任务你就知道错了。
如果有大量短任务的或者初始化时间长的作业,能够对后续任务重用JVM就可以体现出性能上的优势。
启动任务重用JVM后,任务不会同时运行在一个JVM上。JVM顺序执行各个任务,但是tasktracker可以一次性执行多个任务,都是在独立的JVM内运行的。 我们可以让同一作业中的不同任务共享一个JVM,数量不限
共享JVM的另一个好处是,作业之间的状态共享,通过共享静态字段中存储的相关数据,任务可以快速访问共享数据
跳过坏记录
- 检测坏记录并忽略,或者抛出异常中止运行
- 处理坏记录在于mapper和reducer代码
- 任务失败后两次,启动skipping mode
- 一次task attempt ,skipping mode只能检测出一个坏记录,所以要增加task attempt的次数
- 坏记录保存在_logs/skip子目录下的作业输出文件中(用
hadoop fs -text 诊断)
任务执行环境
执行环境的属性:
MapReduce的类型和格式
- map格式:map:(k1,v1)- > list(k2,v2)
- conbine格式:conbine:(k2,list(v2)) - > list(k2,v2)
- reduce格式:reduce( k2 , list(v2) ) - > list( k3, v3)
一般来说,map函数的输入的键值类型(k1,v1)相同于输出类型(k2,v2)
partition函数将中间的键值对(k2,v2)进行处理,并且返回一个分区索引(partition index),分区由键决定
partition : (k2,v2) -> interger
public interface Partition<K2,V2> extends JobConfigurable{
int getPartition(K2 key, V2 value , int numPartitions) ;
}
默认的输入格式是TextInputFormat ,它产生的键的类型是LongWritable(文件中每行中开始的偏移量),值的类型是Text文本行,最后输出的整数是行偏移量。
默认的mapper是IdentityMapper ,它将原封不动地写在输出中
public class IdentityMapper<k,v>extends MapReduceBase implments Mapper<k,v,k,v>{
public void map(K key ,V value,
OutputCollecctor <K,V> output , Reporter reporter)
throws IOException{
output.collect(key,value)
}
}
IdentityMapper是一个泛型类型,它可以接受任何键
默认的partitioner是HashPartitioner,它对记录的键进行哈希操作以决定该记录属于哪个分区。每个分区对应一个reducer任务,所以分区个数等于作业的reducer个数
public class HashPartitioner<k2,v2> implments Partition<k2,v2>{
public void configure(JobConf job){}
public int getPartition(K2 key , V2 value, int numPartitions){
return (key.hashcode()& Integer.Max_VALUE)%numPartitions;
}
}
键的哈希码被转换成一个非负整数,它由哈希值与最大的整形值做一个按位与操作而获得的
|