IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop -> 正文阅读

[大数据]Hadoop

文章目录

Hadoop (一)

1、分布式文件系统

1、FS File System

? 文件系统时极域硬盘之上的文件管理的工具

? 我们用户操作文件系统可以和硬盘进行解耦

2、DFS Distributed File System

? 分布式文件系统

? 将我们的数据存放在多台电脑上存储

? 分布式文件系统有很多,HDFS(Hadoop Distributed FileSyetem)是Hadoop自带的分布式文件系统

? HDFS是mapreduce计算的基础

2、文件切分的思想(引出分而治之的思想 第一个核心思想)

? a. 文件存放在一个磁盘上效率肯定是最低的

? 读取效率低

? 如果文件特别大会超出单机的存储范围

? b. 字节数组

? 文件在磁盘真实存储文件的抽象概念

? 数组可以进行拆分和组装,源文件不会收到影响

? c. 切分数据

? 对字节数组进行切分

? d. 拼接数据

? 按照数组的偏移量将数据连接到一起,将字节数组连接到一起

? e. 偏移量

? 当前数据在数组中的相对位置,可以理解为下标

? 数组都有对应的索引,可以快速定位数据

? f. 数据存储的原理:

? 不管文件的大小,所有的文件都是由字节数组构成

? 如果我们要切分文件,就是将一个字节数组分成多份

? 我们将切分后的数据拼接到一起,数据还可以继续使用

? 我们需要根据数据的偏移量将他们重新拼接到一起

3、Block拆分标准

数据块Block

? a. 是磁盘进行数据 读/写的最小单位,数据被切分后的一个整体被称之为块
? b. 在Hadoop 1默认大小为64M,在Hadoop 2及其之后默认大小为128M块,这么大是为了最小化寻址开销
? c. 同一个文件中,每个数据块的大小要一致除了最后一个节点外
? 不同文件中,块的大小可以不一致
? 文件大小不同可以设置不同的块的数量
? HDFS中小于一个块的大小的文件不会占据整个块的空间
? d. 真实情况下,会根据文件大小和集群节点的数量综合考虑块的大小
? e. 数据块的个数=Ceil(文件大小/每个块的大小)

拆分的数据块需要等大(面试题)

? a. 数据计算的时候简化问题的复杂度(否则进行分布式算法设计的时候会因为数据量不一很难设计)
? b. 数据拉取的时候时间相对一致
? c. 通过偏移量就知道这个块的位置
? d. 相同文件分成的数据块大小应该相等

注意事项

? a. 只要有任意一个块丢失,整个数据文件被损坏
? b. HDFS中一旦文件被存储,数据不允许被修改
? 修改会影响偏移量

修改会导致数据倾斜(单节点数据量过多)

? 修改数据会导致蝴蝶效应
? c. 但是可以被追加(一般不推荐)
? 追加设置需要手动打开
? d. 一般HDFS存储的都是历史数据.所以将来Map Reduce都用来进行离线数据的处理
? f. 块的大小一旦文件上传之后就不允许被修改 128M-512M

4、Block数据安全

? a. 只要有任意一个块丢失,整个数据文件被损坏
? b. 肯定要对存储数据做备份
? c. HDFS是直接对原始数据进行备份的,这样能保证恢复效率和读取效率
? d. 备份的数据肯定不能存放在一个节点上,使用数据的时候可以就近获取数据
? f. 备份的数量要小于等于节点的数量
? g. 每个数据块默认会有三个副本,相同副本是不会存放在同一个节点上
? h. 副本的数量可以变更
? 可能近期数据被分析的可能性很大,副本数可以多设置几个
? 后期数据很少被分析,可以减少副本数

5、Block的管理效率

需要专门给节点进行分工

  • 存储 DataNode 实际存储数据的节点
  • 记录 NameNode
  • 日志 Secondary NameNode

在这里插入图片描述

6、Hadoop中的常见的shell命令

1、如何将linux本地的数据上传到HDFS中呢?
hadoop fs -put 本地的文件 HDFS中的目录
hdfs dfs -put 本地的文件 HDFS中的目录

2、如何创建HDFS中的文件夹呢?
需求:想创建/shujia/bigdata17
hadoop fs -mkdir /shujia/bigdata17
hdfs dfs -mkdir /shujia/bigdata17

3、查看当前HDFS目录下的文件和文件夹
hadoop fs -ls /shujia/bigdata17
hdfs dfs -ls /shujia/bigdata17

4、将HDFS的文件下载到Linux本地中
hadoop fs -get HDFS中的文件目录 本地要存放文件的目录
hdfs dfs -get HDFS中的文件目录 本地要存放文件的目录

5、删除命令(如果垃圾回收站大小小于被删除文件的大小,直接被删除,不经过回收站)
hadoop fs -rm ....  # 仅删除文件
hadoop fs -rmr .... # 删除文件夹


6、移动文件
hadoop fs -mv 目标文件  目的地路径

7、HDFS内部复制文件
hadoop fs -cp [-p] ... ... # 如果想复制文件夹,加上-p参数

7、强制格式化集群(遇到问题的简单暴力的方法)

1、停止正在运行的集群

? stop-all.sh

2、删除所有节点hadoop根目录中的tmp文件夹

3、在主节点(master)中hadoop的根目录中的bin目录下,重新格式化HDFS

./hdfs namenode -format

4、启动集群

? start-all.sh

8、进程理解

8.1、HDFS相关(NN,DN,SSN)

NameNode(NN)

在这里插入图片描述

功能:

? 1、接受客户端的读/写服务

? 因为NameNode知道数据文件与DataNode的对应关系

? 2、保存文件的时候会保存文件的元数据信息

? a. 文件的归属

? b. 文件的权限

? c. 文件的大小,时间

? d. Block信息,但是block的位置信息不会持久化,需要每次开启集群的时候DN向NN汇报。

? 3、收集Block的位置信息

? 3.1 系统启动

? a. NN关机的时候是不会存储任何的Block与DataNode的映射信息的

? b. DN启动的时候会自动将自己节点上存储的Block信息汇报给NN

? c. NN接收请求之后会重新生成映射关系

? File ----> Block

? Block—> DN

? d. 如果数据块的副本数小于设置数,那么NN会将这个副本拷贝到其他节点

? 3.2 集群运行中

? a. NN与DN保持心跳机制,三秒钟发送一次

? b. 如果客户端需要读取或者上传数据的时候,NN可以知道DN的健康情况

? c. 可以让客户端读取存活的DN节点

? d. 如果NN与DN三秒没有心跳则认为DN出现异常,此时不会让新的数据写到这个异常的DN中,客户端访问的时候不提供异常DN节点地址

? e. 如果超过十分钟没有心跳,那么NN会将当前DN节点存储的数据转移到其他的节点

? 4、NameNode为了效率,将所有的操作都在内存中进行

? a. 执行速度快

? b. NameNode不会和磁盘进行任何的数据交换

? 但是会存在两个问题:

? 1)数据的持久化

? 2)数据保存在内存中,断电丢失

DataNode(DN)

1、存放的是文件的数据信息,以及验证文件完整性的校验信息

2、数据会存放在硬盘上

? a. 1m=1条元数据

? b. 1G=1条元数据

? c. NameNode非常排斥存储小文件(能存,但是不推荐!!面试必问)

? 一般小文件在存储之前需要进行压缩

3、汇报

? 1)启动时

? 汇报之前会验证Block文件是否被损坏

? 向NN汇报当前DN上block的信息

? 2)运行中

? 向NN保持心跳机制

4、当客户端读写数据的时候,首先会先去NN查询file与block与DN的映射,然后客户端直接与DN建立连接,然后读写数据

SecondaryNameNode(SNN)

1、传统的内存持久化方案

? 1)日志机制

? a. 做任何操作之前先记录日志

? b. 在数据改变之前先记录对应的日志,当NN停止的时候

? c. 当我下次启动的时候,只需要重新按照以前的日志“重做一遍”即可

? 缺点:

? a. log日志文件的大小不可控,随着时间的发展,集群启动的时间会越来越长

? b. 有可能日志中存在大量的无效日志

? 优点:

? a. 绝对不会丢失数据

? 2)拍摄快照

? a. 我们可以将内存中的数据写出到硬盘上(序列化)

? b. 启动时还可以将硬盘上的数据写回到内存中(反序列化)

? 缺点:

? a. 关机时间过长

? b. 如果是异常关机,数据还在内存中,没法写入到硬盘

? c. 如果写出的频率过高,导致内存使用效率低

? 优点:

? 启动时间较短

2、SNN的解决方案(面试题)

? 1)解决思路

? a. 让日志大小可控(每64M)

? b. 快照需要定时保存(每隔1h)

? c. 日志+快照

? 2)解决方案

? a. 当我们启动一个集群的时候,会产生4个文件 …/name/current/

在这里插入图片描述

? b. 我们每次操作都会记录日志–>edits-inprogress- edits_00000001,随着时间的推移,日志文件会越来越大-当达到阈值的时候(64M或3600秒),会生成新的日志文件,edits_inprogress-000000001 -->edits_0000001,创建新的日志文件 edits_inprogress-0000000016。

在这里插入图片描述

在这里插入图片描述

9、安全模式

安全模式是 HDFS 的一种工作状态,处于安全模式的状态下,只向客户端提供文件的只读视图,不接受对命名空间的修改;同时 NameNode 节点也不会进行数据块的复制或者删除,
NameNode 启动时,
1)首先将镜像文件( fsimage )载入内存,并执行编辑日志( edits )中的各项操作。
2)一旦在内存中成功建立文件系统元数据的映射,则创建一个新的 fsimage 文件和一个空的编辑日志。
3)NameNode 开始监听 RPC 和 Http 请求。
4)此时 NameNode 处于安全模式,只接受客户端的读请求。

? 5)处于这个状态是为了保护数据的安全所以只能被客户端访问读取数据

# 对安全模式的理解
# 1.工作流程
	a.启动 NameNode,NameNode 加载 fsimage 到内存,对内存数据执行 edits log 日 志中的事务操作。
	b.文件系统元数据内存镜像加载完毕,进行 fsimage 和 edits log 日志的合并,并创 建新的 fsimage 文件和一个空的 edits log 日志文件。
	c.NameNode 等待 DataNode 上传 block 列表信息,直到副本数满足最小副本条件。
	d.当满足了最小副本条件,再过 30 秒,NameNode 就会退出安全模式。最小副本条件指 整个文件系统中有 99.9%的 block 达到了最小副本数(默认值是 1,可设置)
# 在 NameNode 安全模式(safemode)
	对文件系统元数据进行只读操作
	当文件的所有 block 信息具备的情况下,对文件进行只读操作
	不允许进行文件修改(写,删除或重命名文件)
# 2.注意事项
	a.NameNode 不会持久化 block 位置信息;DataNode 保有各自存储的 block 列表信息。 正常操作时,NameNode 在内存中有一个 blocks 位置的映射信息(所有文件的所有文 件块的位置映射信息)。
	b.NameNode 在安全模式,NameNode 需要给 DataNode 时间来上传 block 列表信息到 NameNode。如果 NameNode 不等待 DataNode 上传这些信息的话,则会在 DataNode 之间进行 block 的复制,而这在大多数情况下都是非必须的(因为只需要等待 DataNode 上传就行了),还会造成资源浪费。
	c.在安全模式 NameNode 不会要求 DataNode 复制或删除 block。
	d.新格式化的 HDFS 不进入安全模式,因为 DataNode 压根就没有 block。
# 4.命令操作
# 通过命令查看 namenode 是否处于安全模式:
hdfs dfsadmin -safemode get
Safe mode is ON HDFS 的前端 webUI 页面也可以查看 NameNode 是否处于安全模式。 有时候我们希望等待安全模式退出,之后进行文件的读写操作,尤其是在脚本中,此时:
`hdfs dfsadmin -safemode wait`
# your read or write command goes here 管理员有权在任何时间让 namenode 进入或退出安全模式。进入安全模式:
`hdfs dfsadmin -safemode enter`
Safe mode is ON 这 样 做 可 以 让 namenode 一 直 处 于 安 全 模 式 , 也 可 以 设 置 `dfs.namenode.safemode.threshold-pct` 为 1 做到这一点。 离开安全模式:
`hdfs dfsadmin -safemode leave`
Safe mode is OFF

系统中的数据块的位置并不是由 NameNode 维护的,而是以块列表的形式存储在 DataNode 中。
[root@node01 ~]# rm -rf /var/yjx/hadoop/full/dfs/name/current/*
[root@node01 ~]# scp -r
root@node02:/var/yjx/hadoop/full/dfs/namesecondary/current/*
/var/yjx/hadoop/full/dfs/name/current

安全模式下
a. 安全模式下,各个 DataNode 会向 NameNode 发送自身的数据块列表
b. 当 NameNode 有足够的数据块信息后,便在 30 秒后退出安全模式
c. NameNode 发现数据节点过少会启动数据块复制过程
如果 NN 收集的 Block 信息没有达到最少副本数,就会将缺失的副本 , 从有的 DN 上拷贝到其他 DN
a. dfs.replication.min=2
b. 但是默认最低副本数为 1
c. 在拷贝的过程中系统还是处于安全模式
安全模式相关命令
hadoop dfsadmin -safemode leave 强制 NameNode 退出安全模式
hadoop dfsadmin -safemode enter 进入安全模式
hadoop dfsadmin -safemode get 查看安全模式状态
hadoop dfsadmin -safemode wait 等待一直到安全模式结束

10、HDFS的权限

HDFS对权限的控制

? a. 只能防止好人做错事

? b. 不能防止坏人做坏事

但是告诉你是谁,他就认为你是谁!!

11、机架感知

机架感知是为了保证副本在集群中的安全性
我们需要将节点放在不同的DN节点上,节点也需要一定的考量
可靠性,可用性,带宽消耗
第一个节点:
集群内部(优先考虑和客户端相同的节点作为第一个节点)
集群外部(选择资源丰富且不繁忙的节点作为第一个节点)
第二个节点:
第二个节点选择与第一个节点不同机架的其他节点
第三个节点:
与第二个相同机架相同的其他节点
第N个节点:
与前面节点不重复的其他节点

第一种模式

在这里插入图片描述

第二种模式:

在这里插入图片描述

12、HDFS的读写流程(重点!)

12.1、写数据

写数据就是将客户端上的数据上传到HDFS

2.6.1 宏观过程!
在这里插入图片描述

在这里插入图片描述

1.客户端向HDFS发送写数据请求

hdfs dfs -put students.txt /shujia/

2. Filesystem通过rpc调用namenode的put方法

a. nn首先检查是否有足够的空间权限等条件创建这个文件,或者这个路径是否已经存在,权限

b. 有:NN会针对这个文件创建一个空的Entry对象,并返回成功状态给DFS

c. 没有:直接抛出对应的异常,给予客户端错误提示信息

3.如果DFS接收到成功的状态,会创建一个FSDataOutputStream的对象给客户端使用

4.客户端要向nn询问第一个Block存放的位置

? NN通过机架感知策略 (node1 node 2 node3)

5.需要将客户端和DN节点创建连接

pipeline(管道)

客户端 和 node1 创建连接 socket
node1 和 node2 创建连接 socket
node2 和 Node3 创建连接 socket
6.客户端按照文件块切分数据,但是按照packet发送数据
默认一个packet大小为64K,Block128M为2048个packet

7.客户端通过pipeline管道开始使用FDSOutputStream对象将数据输出

    1. 客户端首先将一个 packet 发送给 node1, 同时给予 node1 一个 ack 状态
    2. node1接受数据后会将数据继续传递给 node2, 同时给予 node2 一个 ack 状态
    3. node2接受数据后会将数据继续传递给 node3, 同时给予 node3 一个 ack 状态
    4. node3将这个 packet 接受完成后,会响应这个 ack 给 node2 为 true
    5. node2会响应给 node1 , 同理 node1 响应给客户端

8.客户端接收到成功的状态 , 就认为某个 packet 发送成功了,直到当前块所有的 packet 都发送完成

? 1. 如果客户端接收到最后一个 pakcet 的成功状态 , 说明当前 block 传输完成,管道就会被撤销

? 2. 客户端会将这个消息传递给 NN , NN 确认传输完成

? 1. NN会将 block 的信息记录到 Entry, 客户端会继续向 NN 询问第二个块的存储位置 , 依次类推

? block1 (node1 node2 node3)

? block2 (node1 node3 node6)

? …

? blockn(node1 node4 node6)

  1. 当所有的 block 传输完成后, NN 在 Entry 中存储所有的 File 与 Block 与 DN 的映射关系关闭
FsDataOutPutStream

2.6.2 微观过程(如何保证package发送的时候不出错呢?)

在这里插入图片描述

1.客户端首先从自己的硬盘中以流的形式将自己的数据读取到缓存中
2.然后将缓存中的数据以chunk(512B)和checksum(4B)的方式放入到packet(64k)

1. chunk:checksum=128:1
2. checksum:在数据处理和数据通信领域中,用于校验目的的一组数据项的和
3. Packet中的数据分为两类,一类是实际数据包,另一类是 header 包。
4. 一个 Packet 数据包的组成结构(分两类,一类是实际的数据包,另一类是header包。)

一个数据包的组成结构:

在这里插入图片描述

参数理解:

在这里插入图片描述

3.(默认生成的快,发送的慢)当packet满的时候添加到dataqueue
4.datastreamer开始从dataqueue队列上读取一个packet,通过FDSDataOPS发送到Poepleline
在取出的时候,也会将 packet 加入到 ackQueue, 典型的生产者消费者模式

? 客户端发送一个 Packet 数据包以后开始接收 ack ,会有一个用来接收 ack 的 ResponseProcessor 进
程,如果收到成功的 ack

1. 如果某一个 packet 的 ack 为 true, 那么就从 ackqueue 删除掉这个 packet
2. 如果某一个 packet 的 ack 为 false, 将 ackqueue 中所有的 packet 重新挂载到 发送队列 , 重新发送

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kMaf47Sy-1659443580908)(C:\Users\GAO\Desktop\笔记\笔记图片\HDFS微观写流程.png)]

最终DFS保存的数据格式:

在这里插入图片描述

12.2、读数据

在这里插入图片描述

1.首先客户端发送请求到 DFS ,申请读取某一个文件
2.DFS 去 NN 查找这个文件的信息 ( 权限 , 文件是否存在 )
如果文件不存在,抛出指定的错误
如果文件存在,返回成功状态
3.DFS 创建 FSDataInputStream 对象,客户端通过这个对象读取数据
4.客户端获取文件第一个 Block 信息 , 返回 DN1 DN2 DN8
5.客户端直接就近原则选择 DN1 对应的数据即可
6.依次类推读取其他块的信息,直到最后一个块 , 将 Block 合并成一个文件
7.关闭 FSDataInputStream
在这里插入图片描述

Hadoop (二)

一、Hadoop-HA

1.1 Hadoop1.x带来的问题

1、单点故障

? a. 每个群集只有一个NameNode,NameNode存在单点故障(SPOF)。
? b. 如果该计算机或进程不可用,则整个群集在整个NameNode重新启动或在另一台计算机上启 动之前将不可用
? c. 如果发生意外事件(例如机器崩溃),则在操作员重新启动NameNode之前,群集将不可 用。
? d. 计划内的维护事件,例如NameNode计算机上的软件或硬件升级,将导致群集停机时间的延 长
2、水平扩展 将来服务器启动的时候,启动速度慢

3、namenode随着业务的增多,内存占用也会越来越多 如果namenode内存占满,将无法继续提供服务

4、日志丢失问题

1.2 设计思路

1、hadoop2.x启用了主备节点切换的模式(1主1备)

2、当主节点出现异常的时候,集群直接将备用节点切换成主节点

  • ? 要求备用节点马上就要工作
  • ? 主备节点内存几乎同步 有

3、独立的线程对主备节点进行监控健康状态

4、需要有一定的选举机制,帮助我们确定主从关系

5、我们需要实时存储日志的中间件

1.3 组织架构

在这里插入图片描述

Active NameNode(ANN)

? a. 它的功能和原理的NN的功能是一样的

? b. 接受客户端请求,查询数据块DN信息

? c. 存储数据的元数据信息

? 数据文件:Block:DN的映射关系

? d. 工作

? 启动时:接受DN的block汇报

? 运行时:和DN保持心跳(3s,10m)

? e. 存储介质

? 完全基于内存

? 优点:数据处理效率高

? 缺点:数据的持久化(日志edits+快照fsimage)

Standby NameNode(SNN)

? a. Standby NameNode:NN的备用节点

? b. 他和主节点做同样的工作,但是它不会发出任何指令

? c. 存储:数据的元数据信息

? 数据文件:Block:DN的映射关系
? 它的内存数据和主节点内存数据几乎是一致的

? d. 工作:

? 启动时: 接受DN的block汇报
? 运行时: 和DN保持心跳(3s,10m)

? e. 存储介质:完全基于内存

? 优点:数据处理效率高

? 缺点:数据的持久化

# f. 合并日志文件和镜像

? 1)当搭建好集群的时候,格式化主备节点的时候,ANN和SNN都会默认创建 fsimage_000000000000000这样的镜像文件

? 2)当我们操作HDFS的时候ANN会产生日志信息edits_inprogress_0000000000001

? 3)主节点会将日志文件中新增的数据同步到JournalNode集群上

? 4)所以只需要snn有操作的日志信息,就可以合并fsImage与edits信息,理论上是一直在合并数据

? fsimage -->初始化创建

? edits–>从JournalNode集群上定时同步

? 现在只要同步到edits文件,就开始于fsimage合并

? 当达到阈值的时候,直接拍摄快照即可

? 5) SNN将合并好的Fsimage发送给ANN,ANN验证无误后,存放到自己的目录中

? 在这里插入图片描述

DataNode(DN)

? a. 存储文件的Block数据

? b. 介质:硬盘

? c. 启动时:同时向两个NN汇报Block信息

? d. 运行中:同时和两个主节点保持心跳机制

JournalNode(JN)

在这里插入图片描述

? a. Quorum JournalNode Manager 共享存储系统,NameNode通过共享存储系统实现日志数据同 步。

? b. JournalNode是一个独立的小集群,它的实现原理和Zookeeper的一致( Paxos)

? c. ANN产生日志文件的时候,就会同时发送到 JournalNode的集群中每个节点上

? d. JournalNode不要求所有的jn节点都接收到日志,只要有半数以上的(n/2+1)节点接受收到日 志,那么本条日志就生效

? e. SNN每间隔一段时间就去QJM上面取回最新的日志

? SNN上的日志有可能不是最新的

? f. HA集群的状态正确至关重要,一次只能有一个NameNode处于活动状态

? g. JournalNode只允许单个NameNode成为活跃的主节点。在故障转移期间,将变为活动状态的NameNode 将承担写入JournalNodes的角色,这将有效地防止另一个NameNode继续处于活动状态,从而使 新的Active节点可以安全地进行故障转移。

Failover Controller(ZKFC:故障转移控制器)

# 对 NameNode 的主备切换进行总体控制,能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换,为了防止因为NN的GC失败导致心跳受影响,ZKFC作为一个deamon进程从NN分离出来

在这里插入图片描述

1) 启动时:

? 当集群启动时,主备节点的概念是很模糊的
? 当ZKFC只检查到一个节点是健康状态,直接将其设置为主节点
? 当zkfc检查到两个NN节点是的健康状态,发起投票机制
? 选出一个主节点,一个备用节点,并修改主备节点的状态

一个常见的问题:如果启动的时候,发现有两个备用节点,肯定是因为没有选举,检查一下zookeeper有没有启动起来,如果还是失败,重启一下

问题:

? 1、SNN为什么不直接去ANN去同步数据。

? 2、SNN挂了谁去合并日志呢?没有人

2) 运行时:

由 ZKFailoverController、HealthMonitor 和 ActiveStandbyElector 这 3 个组件来协同实现主备切换

? a. ZKFailoverController启动的时候会创建 HealthMonitor 和 ActiveStandbyElector 这两 个主要的内部组件
? b. HealthMonitor 主要负责检测 NameNode 的健康状态

? c. ActiveStandbyElector 主要负责完成自动的主备选举,内部封装了 Zookeeper 的处理逻辑

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lB0jjg21-1659443580914)(C:\Users\GAO\Desktop\笔记\笔记图片\image-20220524220553126.png)]

3) 主备节点正常切换
NameNode 在选举成功后, ActiveStandbyElector 会在 zk 上创建一个
ActiveStandbyElectorLock 临时节点,而没有选举成功的备 NameNode 中的ActiveStandbyElector 会监控这个节点

? 如果 Active NameNode 对应的 HealthMonitor 检测到 NameNode 的状态异常时,ZKFailoverController 会主动删除当前在 Zookeeper 上建立的临时节点ActiveStandbyElectorLock

? 处于 Standby 状态的 NameNode 的 ActiveStandbyElector 注册的监听器就会收到这个节点的 NodeDeleted 事件,并创建 ActiveStandbyElectorLock 临时节点,本来处于 Standby 状态的 NameNode 就选举为 Active NameNode 并随后开始切换为 Active 状态。

? 如果是 Active NameNode 的机器整个宕掉的话,那么跟 zookeeper 连接的客户端线程也挂了 , 会话结束 , 那么根据 Zookeepe 的临时节点特性, ActiveStandbyElectorLock 节点会自动被删除,从而也会自动进行一次主备切换,

Zookeeper
为主备切换控制器提供主备选举支持。
辅助投票
和 ZKFC 保持心跳机制,确定 ZKFC 的存活

在这里插入图片描述

1.4 脑裂brain-split

定义
脑裂是 Hadoop2.X 版本后出现的全新问题,实际运行过程中很有可能出现两个 namenode 同时服务于整个集群的情况,这种情况称之为脑裂。

原因
脑裂通常发生在主从 namenode 切换时,由于 ActiveNameNode 的网络延迟、设备故障等问题,另一个 NameNode 会认为活跃的 NameNode 成为失效状态,此时 StandbyNameNode 会转换成活跃状态,此时集群中将会出现两个活跃的 namenode 。因此,可能出现的因素有网络延迟、心跳故障、设备故障等。

脑裂场景
NameNode 可能会出现这种情况, NameNode 在垃圾回收( GC )时,可能会在长时间内整个系统无响应
zkfc 客户端也就无法向 zk 写入心跳信息,这样的话可能会导致临时节点掉线,备 NameNode会切换到 Active 状态
这种情况可能会导致整个集群会有同时有两个 Active NameNode

脑裂问题的解决方案是隔离( Fencing )

  1. 第三方共享存储:任一时刻,只有一个 NN 可以写入;
  2. DataNode :需要保证只有一个 NN 发出与管理数据副本有关的命令;
  3. Client 需要保证同一时刻只有一个 NN 能够对 Client 的请求发出正确的响应。
    a) 每个 NN 改变状态的时候,向 DN 发送自己的状态和一个本次选举对应的序列号。
    b) DN 在运行过程中维护此序列号,当 failover 时,新的 NN 在返回 DN 心跳时会返回自己的 active 状态和一个更大的序列号。
    DN 接收到这个返回是认为该 NN 为新的 active 。
    ? 如果这时原来的 active (比如 GC )恢复,返回给 DN 的心跳信息包含 active 状态和原来的序列号,这时 DN 就会拒绝这个 NN 的命令。
    解决方案
    ActiveStandbyElector 为了实现 fencing ,当 NN 成为 ANN 之后创建 Zookeeper 临时节点ActiveStandbyElectorLock ,创建 ActiveBreadCrumb 的持久节点,这个节点里面保存了这个Active NameNode 的地址信息 (node-01)
    Active NameNode 的 ActiveStandbyElector 在正常的状态下关闭 Zookeeper Session 的时候,会一起删除这个持久节点
    但如果 ActiveStandbyElector 在异常的状态下关闭,那么由于 /hadoop ha/${dfs.nameservices}/ActiveBreadCrumb 是持久节点,会一直保留下来,后面当另一个NameNode 选主成功之后,会注意到上一个 Active NameNode 遗留下来的这个节点,从而会回调 ZKFailoverController 的方法对旧的 Active NameNode 进行 fencing 。
    首先尝试调用这个旧 Active NameNode 的 HAServiceProtocol RPC 接口的transitionToStandby 方法,看能不能把它转换为 Standby 状态;
    如果 transitionToStandby 方法调用失败,那么就执行 Hadoop 配置文件之中预定义的隔离措施。
  4. sshfence :通过 SSH 登录到目标机器上,执行命令 fuser 将对应的进程杀死
  5. shellfence :执行一个用户自定义的 shell 脚本来将对应的进程隔离
    在成功地执行完成 fencing 之后,选主节点成功的 ActiveStandbyElector 才会回调ZKFailoverController 的 becomeActive 方法将对应的 NameNode 转换为 Active 状态,开始对外提供服务。新的主创建临时节点 ActiveStandbyElectorLock ,创建持久化节点 ActiveBreadCrumb ,并将自己的主机地址 Node02 赋值给初始化节点

二、Hadoop-federation

在这里插入图片描述

HDFS federation就是使得HDFS支持多个命名空间,并且允许在HDFS中同时存在多个Name Node

2.1 单NN局限性

Namespace(命名空间)的限制

单个datanode从4T增长到36T,集群的尺寸增长到8000个datanode。存储的需求从12PB增长到大于100PB。

性能的瓶颈

由于是单个Namenode的HDFS架构,因此整个HDFS文件系统的吞吐量受限于单个Namenode的吞吐量。毫无疑问,这将成为下一代MapReduce的瓶颈。

隔离问题

由于HDFS仅有一个Namenode,无法隔离各个程序,因此HDFS上的一个实验程序就很有可能影响整个HDFS上运行的程序。那么在HDFS Federation中,可以用不同的Namespace来隔离不同的用户应用程序,使得不同Namespace Volume中的程序相互不影响。

集群的可用性

在只有一个Namenode的HDFS中,此Namenode的宕机无疑会导致整个集群不可用。

Namespace和Block Management的紧密耦合

当前在Namenode中的Namespace和Block Management组合的紧密耦合关系会导致如果想要实现另外一套Namenode方案比较困难,而且也限制了其他想要直接使用块存储的应用。

为什么纵向扩展目前的Namenode不可行?比如将Namenode的 JVM Heap空间扩大到512GB。

这样纵向扩展带来的第一个问题就是启动问题,启动花费的时间太长。当前具有50GB Heap Namenode的HDFS启动一次大概需要30分钟到2小时,那512GB的需要多久?第二个潜在的问题就是Namenode在Full GC时,如果发生错误将会导致整个集群宕机。第三个问题是对大JVM Heap进行调试比较困难。优化Namenode的内存使用性价比比较低。

2.2 federation

在这里插入图片描述

NameNode提供了命名空间(NameSpace)和块(Block)管理功能。
HDFS 联邦拥有多个独立的命名空间,每个空间管理属于自己的一组块,这些同组的块构成了“块 池”(Block Pool)。
所有的NameNode 会共享底层的数据节点的存储资源。数据节点是一个物理概念,块池则属于逻辑概念。

Block pool(块池)

? 所谓Block pool(块池)就是属于单个命名空间的一组block(块)管理区域。
? 每一个datanode为所有的block pool存储块。
? Datanode是一个物理概念,而block pool是一个重新将block划分的逻辑概念。

? 同一个datanode中可以存着属于多个block pool的多个块。

? Block pool允许一个命名空间在不通知其他命名空间的情况下为一个新的block创建Block ID。同时,一个Namenode失效不会影响其下的datanode为其他Namenode的服务。
? 当datanode与Namenode建立联系并开始会话后自动建立Block pool。每个block都有一个唯一的标识,这个标识我们称之为扩展的块ID(Extended Block ID)= BlockID+BlockID。这个扩展的块ID在HDFS集群之间都是唯一的,这为以后集群归并创造了条件。
? Datanode中的数据结构都通过块池ID(BlockPoolID)索引,即datanode中的BlockMap,storage等都通过BPID索引。
? 在HDFS中,所有的更新、回滚都是以Namenode和BlockPool为单元发生的。即同一HDFS Federation中不同的Namenode/BlockPool之间没有什么关系。
? 目前Hadoop版本中Block Pool的管理功能依然放在了Namenode中,将来的版本中会将Block Pool的管理功能移动的新的功能节点中。

Namespace Volume(命名空间卷)

? 一个Namespace和它的块池合并在一起称为Namespace Volume,它是一个独立完整的管理单元。

? 当一个Namenode/Namespace被删除,与之相对应的块池也被删除。

? 在升级时,每一个nanespace Volume也会整体作为一个单元。

通过多个namenode/namespace把元数据的存储和管理分散到多个节点中

? 降低单个节点数据压力,计算压力

namenode/namespace可以通过增加机器来进行水平扩展

? 可以让更多的节点参与到运算

? namespace命名空间,通过这种方式确定要处理数据的路径

我们可以通过namenode和namespace组合使用

? 所有的nn共享dn

? 但是每一个namespace会单独管理自己的模块

? 会创建一个管理块的机制:blocks pool

注意:联邦机制和HA完全不一样的,是解决不一样的问题,联邦机制解决的是业务水平扩展的问题,HA解决的是单点故障的问题

Hadoop (三)

Hadoop-HA问题:

dfs.ha.fencing.methods
表示:a list of scripts or Java classes which will be used to fence the Active NameNode during a failover

而配置为shell(true)就是直接返回隔离成功,即表示没进行任何操作,为什么不会导致脑裂现象的发生,这是因为Quorun Journal方式内置了fencing功能,不需要实现单独的fencing机制(epoch number解决互斥问题)。
而如果使用共享存储NAS+NFS那种方式的话,就需要配置具体的真正有fencing功能的,比如:sshfence,下面是sshfence的说明:

sshfence - SSH to the Active NameNode and kill the process
The sshfence option SSHes to the target node and uses fuser to kill the process listening on the service’s TCP port. In order for this fencing option to work, it must be able to SSH to the target node without providing a passphrase. Thus, one must also configure the dfs.ha.fencing.ssh.private-key-files option, which is a comma-separated list of SSH private key files. 即配置sshfence需要两个namenode之间配置无密码认证,如下:(hdfs-site.xml)

   <property>
      <name>dfs.ha.fencing.methods</name>
      <value>sshfence</value>
    </property>

    <property>
      <name>dfs.ha.fencing.ssh.private-key-files</name>
      <value>/root/.ssh/id_rsa</value>
    </property>

但如果只配置sshfence,如果在机器宕机后不可达,则sshfence会返回false,即fence失败,所以得要配置成:

   <property>
      <name>dfs.ha.fencing.methods</name>
        <value>
            sshfence
            shell(/bin/true)
        </value>
    </property>

这样子配置,顺序执行时,如果可达就执行sshfence执行杀死namenode后返回true,不可达就直接shell(true)返回true。

一、MapReduce设计理念

map—>映射

reduce—>归纳

mapreduce必须构建在hdfs之上的一种大数据离线计算框架

? 在线:实时数据处理

? 离线:数据处理时效性没有在线那么强,但是相对也需要很快得到结果

mapreduce不会马上得到结果,他会有一定的延时(磁盘IO)

? 如果数据量小,使用mapreduce反而不合适

? 杀鸡焉用宰牛刀

原始数据–>map(Key,Value)–>Reduce

分布式i计算

? 将大的数据切分成多个小数据,交给更多的节点参与运算

计算向数据靠拢

? 将计算传递给有数据的节点上进行工作

二、MapReduce架构特点

MapReduce1.x

在这里插入图片描述

JobTracker

主节点,单点,负责调度所有的作用和监控整个集群的资源负载。

TaskTracker

从节点,自身节点资源管理和JobTracker进行心跳联系,汇报资源和获取task。

Client

以作业为单位,规划作业计算分布,提交作业资源到HDFS,最终提交作业到JobTracker。

Slot(槽):

? 属于JobTracker分配的资源(计算能力、IO能力等)。

? 不管任务大小,资源是恒定的,不灵活但是好管理。

Task(MapTask–>ReduceTask):

? 开始按照MR的流程执行业务。

? 当任务完成时,JobTracker告诉TaskTracker回收资源。

MapReduce1.x的弊端

1.JobTracker负载过重,存在单点故障。

2.资源管理和计算调度强耦合,其它计算框架难以复用其资源管理。

3.不同框架对资源不能全局管理。

MapReduce2.x

在这里插入图片描述

Client:
客户端发送MR任务到集群,其中客户端有很多种类,例如hadoop jar

ResourceManager:
资源协调框架的管理者,分为主节点和备用节点(防止单点故障,主备的切换基于ZK的管理),它时刻与NodeManager保持心跳,接受NodeManager的汇报(NodeManager当前节点的资源情况)。

? 当有外部框架要使用资源的时候直接访问ResourceManager即可。

? 如果是有MR任务,先去ResourceManager申请资源,ResourceManager根据汇报分配资源,例如资源在NodeManager1,那么NodeManager1要负责开辟资源。

Yarn(NodeManager):
Yarn(Yet Another Resource Negotiator,另一种资源协调者),统一管理资源。以后其他的计算框架可以直接访问yarn获取当前集群的空闲节点。

? 每个DataNode上默认有一个NodeManager,NodeManager汇报自己的信息到ResourceManager。

Container:
它是动态分配的,2.X资源的代名词。

ApplicationMaster:
我们本次任务的主导者,负责调度本次被分配的资源Container。当所有的节点任务全部完成,applicaion告诉ResourceManager请求杀死当前ApplicationMaster线程,本次任务的所有资源都会被释放。

Task(MapTask–ReduceTask):
开始按照MR的流程执行业务,当任务完成时,ApplicationMaster接收当前节点的反馈。

YARN【Yet Another Resource Negotiator】:Hadoop 2.0新引入的资源管理系统,直接从MRv1演化而来的。

核心思想:将MRv1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和ApplicationMaster进程实现:

ResourceManager:负责整个集群的资源管理和调度。

ApplicationMaster:负责应用程序相关的事务,比如任务调度、任务监控和容错等。

YARN的引入,使得多个计算框架可运行在一个集群中 每个应用程序对应一个ApplicationMaster 目前多个计算框架可以运行在YARN上,比如MapReduce、Spark、Storm等。

三、Hadoop搭建yarn环境(我们已经搭建好了,回顾配置文件)

四、扑克牌的问题

在这里插入图片描述

你想数出一摞牌中有多少张黑桃,红桃,方块,梅花。直观方式是一张一张检查并且数出分别有多少张。
MapReduce方法则是:
1.给在座的所有玩家中分配这摞牌
2.让每个玩家数自己手中的牌有几张是黑桃,然后把这个数目汇报给你
3.你把所有玩家告诉你的数字加起来,得到最后的结论

五、MR的计算流程

计算1T数据中每个单词出现的次数---->wordcount

在这里插入图片描述

5.1 原始数据File(可以从网上找一篇英文的文章)

The books chronicle the adventures of the adolescent wizard Harry Potter and his best friends Ron Weasley and Hermione Granger, all of whom are students at Hogwarts School of Witchcraft and Wizardry. 

1T数据被切分成块存放在HDFS上,每一个块有128M大小

5.2 数据块Block

block块是hdfs上数据存储的一个单元,同一个文件中块的大小都是相同的

因为数据存储到HDFS上不可变,所以有可能块的数量和集群的计算能力不匹配

我们需要一个动态调整本次参与计算节点数量的一个单位

我们可以动态的改变这个单位––>参与的节点

5.3 切片Split

目的:动态地控制计算单元的数量
在这里插入图片描述

切片是一个逻辑概念

在不改变现在数据存储的情况下,可以控制参与计算的节点数目

通过切片大小可以达到控制计算节点数量的目的

? 有多少个切片就会执行多少个Map任务

一般切片大小为Block的整数倍(2 1/2)

? 防止多余创建和很多的数据连接

如果Split大小 > Block大小 ,计算节点少了

如果Split大小 < Block大小 ,计算节点多了

默认情况下,Split切片的大小等于Block的大小 ,默认128M,如果读取到最后一个block块的时候,与前一个blokc块组合起来的大小小于128M*1.1的话,他们结合生一个split切片,生成一个map任务

一个切片对应一个MapTask

5.4 MapTask

map默认从所属切片读取数据,每次读取一行(默认读取器)到内存中(map种的逻辑作用在每一行上)

我们可以根据自己书写的分词逻辑(空格,逗号等分隔),计算每个单词出现的次数(wordcount)

这时会产生(Map<String,Integer>)临时数据,存放到内存中

the books chronicle the adventures of the adolescent wizard Harry Potter and his best friends Ron Weasley and Hermione Granger, all of whom are students at Hogwarts School of Witchcraft and Wizardry

the 1
books 1
chronicle 1
the 1
adventures 1
of 1
...
Wizardry 1

但是内存的大小是有限的,如果每个任务随机的去占用内存,会导致内存不可控。多个任务同时执行有可能内存溢出(OOM)

如果把数据都直接放到硬盘,效率太低

所以想个方案,内存和硬盘结合,我们要做的就是在OOM和效率低之间提供一个有效方案,可以先往内存中写入一部分数据,然后写出到硬盘

5.5 环形缓冲区(KV-Buffer)

在这里插入图片描述

可以循环利用这块内存区域,减少数据溢写时map的停止时间

? 每一个Map可以独享的一个内存区域

? 在内存中构建一个环形数据缓冲区(kvBuffer),默认大小为100M

? 设置缓冲区的阈值为80%(设置阈值的目的是为了同时写入和写出),当缓冲区的数据达到80M开始向外溢写到硬盘

? 溢写的时候还有20M的空间可以被使用效率并不会被减缓

? 而且将数据循环写到硬盘,不用担心OOM问题

5.6 分区Partition(环形缓冲区做的)

在这里插入图片描述

根据Key直接计算出对应的Reduce

分区的数量和Reduce的数量是相等的

hash(key) % partation(reduce的数量) = num

默认分区的算法是Hash然后取余

? Object的hashCode()—equals()

? 如果两个对象equals,那么两个对象的hashcode一定相等

? 如果两个对象的hashcode相等,但是对象不一定equlas

5.7 排序Sort(环形缓冲区做的,快速排序,对前面分区后的编号进行排序,使得相同编号的在一起)

对要溢写的数据进行排序(QuickSort)

按照先Partation后Key的顺序排序–>相同分区在一起,相同Key的在一起

我们将来溢写出的小文件也都是有序的

5.8 溢写Spill

将内存中的数据循环写到硬盘,不用担心OOM问题

每次会产生一个80M的文件

如果本次Map产生的数据较多,可能会溢写多个文件

5.9 合并Merge

因为溢写会产生很多有序(分区 key)的小文件,而且小文件的数目不确定

后面向reduce传递数据带来很大的问题

所以将小文件合并成一个大文件,将来拉取的数据直接从大文件拉取即可

合并小文件的时候同样进行排序(归并 排序),最终产生一个有序的大文件

5.10 组合器Combiner

a. 集群的带宽限制了mapreduce作业的数量,因此应该尽量避免map和reduce任务之间的数据传输,hadoop允许用户对map的输出数据进行处理,用户可自定义combiner函数(如同map函数和reduce函数一般),其逻辑一般和reduce函数一样,combiner的输入是map的输出,combiner的输出作为reduce的输入,很多情况下可以i直接将reduce函数作为conbiner函数来试用(job.setCombinerClass(FlowCountReducer.class))。

b. combiner属于优化方案,所以无法确定combiner函数会调用多少次,可以在环形缓存区溢出文件时调用combiner函数,也可以在溢出的小文件合并成大文件时调用combiner,但是要保证不管调用多少次,combiner函数都不影响最终的结果,所以不是所有处理逻辑都可以i使用combiner组件,有些逻辑如果试用了conbiner函数会改变最后reduce的输出结果(如求几个数的平均值,就不能先用conbiner求一次各个map输出结果的平均值,再求这些平均值的平均值,那样会导致结果的错误)。

c. combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量:

? 原先传给reduce的数据时a1 a1 a1 a1 a1

? 第一次combiner组合后变成a(1,1,1,1,1)

? 第二次combiner后传给reduce的数据变为a(5,5,6,7,23,…)

5.11 拉取Fetch

我们需要将Map的临时结果拉取到Reduce节点

第一种方式:两两合并
第二种方式:相同的进一个reduce
第三种对第二种优化,排序
第四种对第三种优化:如果一个reduce处理两种key,而key分布一个首一个尾,解决不连续的问题,给个编号,这个编号怎么算呢,`回到分区,排序`

原则

? 相同的Key必须拉取到同一个Reduce节点

? 但是一个Reduce节点可以有多个Key

未排序前拉取数据的时候必须对Map产生的最终的合并文件做全序遍历

? 而且每一个reduce都要做一个全序遍历

如果map产生的大文件是有序的,每一个reduce只需要从文件中读取自己所需的即可

5.12 合并Merge

因为reduce拉取的时候,会从多个map拉取数据

那么每个map都会产生一个小文件,这些小文件(文件与文件之间无序,文件内部有序)

为了方便计算(没必要读取N个小文件),需要合并文件

归并算法合并成2个(qishishilia)

相同的key都在一起

5.13 归并Reduce

将文件中的数据读取到内存中

一次性将相同的key全部读取到内存中

直接将相同的key得到结果–>最终结果

5.14 写出Output

每个reduce将自己计算的最终结果都会存放到HDFS上

5.15 MapReduce过程截图

在这里插入图片描述

总结

在这里插入图片描述

六、MapReduce程序编写(统计单词个数)

七、IK分词器(统计三国演义指定词语个数)

八、MapReduce案例

8.1 好友推荐系统

固定类别推荐

? 莫扎特---->钢琴---->贝多芬----->命运交响曲

数据量

? QQ好友推荐—>

? 每个QQ200个好友

? 5亿QQ号

解决思路:

? 需要按照行进行计算

? 将相同推荐设置成相同的key,便于reduce统一处理

数据:

tom hello hadoop cat
world hello hadoop hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world hello
hello tom world hive mr

在这里插入图片描述

分析:

我们需要在map阶段根据用户的直接联系和间接关系列举出来,map输出的为tom:hadoop 1,hello:hadoop 0,0代表间接关系,1代表直接关系。在Reduce阶段把直接关系的人删除掉,再输出。

具体实现

package com.shujia.tuijian;

import com.sun.org.apache.bcel.internal.generic.ACONST_NULL;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.lang.management.LockInfo;

class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
 private Text mkey = new Text();
 private LongWritable mvalue = new LongWritable();


 @Override
 protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
     //tom hello hadoop cat
     /**
         * tom:hello 1
         * tom:hadoop 1
         * tom:cat 1
         *
         * hello:hadoop -1
         * hello:cat -1
         * hadoop:cat -1
         */
        //hadoop tom hive world hello
        /**
         * hadoop:tom 1
         * hadoop:hive 1
         * hadoop:world 1
         * hello:hadoop 1
         */
        //将value类型的数据转成String类型
        String line = value.toString();
        String[] strings = line.split(" ");
        for (int i = 1; i < strings.length; i++) {
            //处理直接好友
            mkey.set(orderFriend(strings[0], strings[i]));
            mvalue.set(1L);
            context.write(mkey, mvalue);
            //处理间接好友
            for (int j = i + 1; j < strings.length; j++) {
                mkey.set(orderFriend(strings[i], strings[j]));
                mvalue.set(-1L);
                context.write(mkey, mvalue);
            }
        }
    }

    //目的是将map输出的key进行统一
    //hadoop:hello 1
    //hello:hadoop 1
    public static String orderFriend(String f1, String f2) {
        //比较字符串的大小
        int i = f1.compareTo(f2);
        if (i > 0) {
            return f1 + ":" + f2;
        } else {
            return f2 + ":" + f1;
        }
    }
}

class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    //(hive:hello [-1,-1,1])
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //hadoop:hello 1
        //hadoop:hello -1
        int flag = 0;
        Long sum = 0L;
        for (LongWritable value : values) {
            if (value.get() == 1) {
                flag = 1;
            }
            sum += value.get();
        }

        if (flag == 0) {
            context.write(key, new LongWritable(-1L));
        } else {
            context.write(key, new LongWritable(0L));
        }
    }
}

/*
    计算出给出的数据中有多少直接关系的组合和间接关系的组合,
    将组合作为key
    直接关系,value是1
    间接关系,value是-1
 */
public class TuiJianDemo {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TuiJianDemo.class);
        job.setNumReduceTasks(1);
        job.setJobName("推荐任务");

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

8.2 PM2.5平均值

package com.shujia.airPM25;

import com.shujia.tuijian.TuiJianDemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

//20220527:1001 20
//20220527:1001 30
//20220527:1001 40
//...
class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        //将一行的数据转成String类型
        String line = value.toString();
        String[] strings = line.split(",");
        //过滤出PM2.5对应的数据
        if (strings.length >= 4 && "PM2.5".equals(strings[2])) {
            for (int i = 3, j = 1001; i < strings.length; i++, j++) {
                //对一行数据做简单的清洗,因为有的时间没有监控到PM2.5的值
                if ("".equals(strings[i]) || strings[i] == null || " ".equals(strings[i])) {
                    strings[i] = "0";
                }

                context.write(new Text("date: " + strings[0] + "-城市编号:" + j), new LongWritable(Long.parseLong(strings[i])));
            }
        }
    }
}

class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        Long sum = 0L;
        for (LongWritable value : values) {
            long l = value.get();
            sum += l;
        }
        //除以24得到该城市当天的PM2.5平均值
        long avg = sum / 24;
        context.write(key, new LongWritable(avg));
    }
}


public class Pm25Avg {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Pm25Avg.class);
        job.setNumReduceTasks(1);
        job.setJobName("计算每个城市每天的pm2.5平均值");

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}

提升:两张表合并或者多张表

思路:map程序每次都是读取一行数据,读取两个表内的数据,可以根据数据来源(文件名称)判断当前读取的数据是来自哪一张表,然后打上标记送入reduce去处理,map输出的key是id值,value是id对应的数据。

reduce接收到的数据是<key,{value1,value2,value3,…}>,key是输入的id的值,value中包含表一的id对应的数据,也包含表二的id对应的数据,我们可以通过map里打的标记进行区分,分别记录到list1与list2中,然后将两个list中的数据进行笛卡尔积就能得到一个id连接后的数据,将所有id都进行这样的操作,就能把整个表都处理完

Combiner

job.setCombinerClass

九、MapReduce源码分析(高级部分)

快捷键

ctrl+alt+方向键:查看上一个或者下一个方法
ctrl+shift+alt+c: 拷贝方法的全名   com.shujia.airPM25.Pm25Avg#main
ctrl+alt+b:查看当前接口的实现类

9.1 Split

带着问题看源码:

1、map的数量和切片的数量一样?

2、split的大小可以自己调节吗?算法是什么?

源代码的分析从提交任务开始

job.waitForCompletion(true);

org.apache.hadoop.mapreduce.Job#waitForCompletion

  /**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,ClassNotFoundException {
    //判断当前的状态
    if (state == JobState.DEFINE) {
        //=============关键代码================
      submit();
    }
    //监控任务的运行状态
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
      //返回任务状态
    return isSuccessful();
  }

org.apache.hadoop.mapreduce.Job#submit

public void submit() throws IOException, InterruptedException, ClassNotFoundException {
    //确认当前任务的状态
    ensureState(JobState.DEFINE);
    //mapreduce1.x和2.x,但是2的时候将1的好多方法进行了优化
    setUseNewAPI();
    //获取当前任务所运行的集群
    connect();
    //创建Job的提交器
    final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException,  ClassNotFoundException {
        //提交任务到系统去执行 
		//Internal method for submitting jobs to the system
        //===========关键代码============
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    //任务的状态修改为运行
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

org.apache.hadoop.mapreduce.JobSubmitter#submitJobInternal

	//validate the jobs output specs 
	//检查一下输出路径存不存在呀,有没有权限之类的
    checkSpecs(job);
    //生成并设置新的JobId
	JobID jobId = submitClient.getNewJobID();
	job.setJobID(jobId);
	//获取任务的提交目录
	Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    // Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
	//===========关键代码============ 197行
    int maps = writeSplits(job, submitJobDir);
    //设置map的数量,其中map的数量就等于切片的数量
    conf.setInt(MRJobConfig.NUM_MAPS, maps);
    LOG.info("number of splits:" + maps);

org.apache.hadoop.mapreduce.JobSubmitter#writeSplits

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
    //获取作业的配置文件
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //今后我们看源码的时候,想都不要想,看新的方式
    if (jConf.getUseNewMapper()) {
      //===========关键代码============
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

org.apache.hadoop.mapreduce.JobSubmitter#writeNewSplits

  private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,InterruptedException, ClassNotFoundException {
    //获取集群配置
    Configuration conf = job.getConfiguration();
    //通过反射工具获取文件读取器对象
    //===========关键代码============ job 的实现类
    //org.apache.hadoop.mapreduce.lib.input.TextInputFormat --> input
    //job->org.apache.hadoop.mapreduce.task.JobContextImpl#getInputFormatClass
    InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
    //获取到切片
    //===========关键代码============ getSplits
    List<InputSplit> splits = input.getSplits(job);
    //转成数组
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    
    //返回的是数组的长度,对应着切片的数量,回到197行验证
    return array.length;
  }

org.apache.hadoop.mapreduce.task.JobContextImpl#getInputFormatClass

  /**
   * Get the {@link InputFormat} class for the job.
   * 
   * @return the {@link InputFormat} class for the job.
   */
  @SuppressWarnings("unchecked")
  public Class<? extends InputFormat<?,?>> getInputFormatClass() throws ClassNotFoundException {
      return (Class<? extends InputFormat<?,?>>) 
      //getClass的操作是如果有值返回值,没有的话使用默认值
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }

org.apache.hadoop.mapreduce.lib.input.FileInputFormat#getSplits

  /** 
   * Generate the list of files and make them into FileSplits.
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    //开始计算两个变量(一个切片最少有一个字节,一个最小切片值也是1)
    //1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //Long.MAX_VALUE
    long maxSize = getMaxSplitSize(job);

    // generate splits
    //创建一个List存放切片
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //获取本次计算中所有的要计算的文件
    List<FileStatus> files = listStatus(job);
    //首先取出一个文件
    for (FileStatus file: files) {
      //获取文件路径
      Path path = file.getPath();
      //获取文件大小
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          //获取文件块的信息
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判断文件是否可以被切分,比如文件被压缩了,需要解压缩才可以
        if (isSplitable(job, path)) {
          //获取单个块的大小
          long blockSize = file.getBlockSize();
          //开始计算切片大小,这里可以验证切片大小与block大小一样
          //思考如何生成256M的切片
          
            
          //如果切片小于blocksize-->将maxsize小于blocksize
          //如果切片大于blocksize-->将minsize大于blocksize
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          //将文件大小分配给bytesRemaining
          long bytesRemaining = length;
          //private static final double SPLIT_SLOP = 1.1;   // 10% slop
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            //制作切片
            //封装切片对象并将其存放到list中
            //makeSplit(路径,偏移量,切片大小,块的位置,备份的位置);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          //如果最后一个文件过小没有大于1.1,就与上一个一起生成切片
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          //如果文件不可切,就生成一个切片
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    //返回切片(后面的代码我们跟不进去了,是yarn上面的了)
    return splits;
  }

计算切片大小逻辑

org.apache.hadoop.mapreduce.lib.input.FileInputFormat#computeSplitSize

 protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    //blockSize--128M
    //maxSize--56M ----> 56M
    //minSize--256M ----> 256M
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

9.2 Map源码-MapTask

带着问题:

1、map读取数据按照行读取数据?验证一下

2、如果切片把一行数据放在了两个切片中呢?怎么办?

3、map里面的第一个参数类型是LongWritable?哪里指定的?

org.apache.hadoop.mapred.MapTask

在这里插入图片描述

  @Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    //判断是否为Map任务
    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      //判断reduce数量是否等于0,有可能等于0的如果我们只是清洗数据,就不需要
      if (conf.getNumReduceTasks() == 0) {
        //map所占的比例100%,没有reducce就不用分区了
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        //如果有reduce的话分区排序
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    //任务报告一下,说明我要处理多少数据
    TaskReporter reporter = startReporter(umbilical);
 
    //使用新api
    boolean useNewApi = job.getUseNewMapper();
    //===========关键代码============
    //使用新api进行初始化
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      //===========关键代码============
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

org.apache.hadoop.mapred.Task#initialize

 public void initialize(JobConf job, JobID id, 
                         Reporter reporter,
                         boolean useNewApi) throws IOException, 
                                                   ClassNotFoundException,
                                                   InterruptedException {
    //获取作业的上下文
    jobContext = new JobContextImpl(job, id, reporter);
    //获取任务的上下文
    taskContext = new TaskAttemptContextImpl(job, taskId, reporter);
    if (getState() == TaskStatus.State.UNASSIGNED) {
      setState(TaskStatus.State.RUNNING);
    }
    if (useNewApi) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("using new api for output committer");
      }
      //创建了一个outputFormat对象
      outputFormat = ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
      committer = outputFormat.getOutputCommitter(taskContext);
    } else {
      committer = conf.getOutputCommitter();
    }
    Path outputPath = FileOutputFormat.getOutputPath(conf);
    if (outputPath != null) {
      if ((committer instanceof FileOutputCommitter)) {
        FileOutputFormat.setWorkOutputPath(conf, 
          ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext));
      } else {
        FileOutputFormat.setWorkOutputPath(conf, outputPath);
      }
    }
    committer.setupTask(taskContext);
    Class<? extends ResourceCalculatorProcessTree> clazz =
        conf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
            null, ResourceCalculatorProcessTree.class);
    pTree = ResourceCalculatorProcessTree
            .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
    if (pTree != null) {
      pTree.updateProcessTree();
      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
    }
  }

org.apache.hadoop.mapred.MapTask#runNewMapper

@SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper--com.shujia.MyMapper
   //对应自己写的map类  TaskAttemptContextImpl
  org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    //org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    //获取切片
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    //===========关键代码============ NewTrackingRecordReader
    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
      //如果reduce数量等于0,直接输出
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      //如果reduce数量不等于0,待会来看,看下面的初始化
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      //===========关键代码============
      //初始化的时候有意识的将第一行省略了
      input.initialize(split, mapperContext);
      //实际上调用的就是我们自己重写的map方法
      //===========关键代码============
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }

org.apache.hadoop.mapred.MapTask.NewTrackingRecordReader#NewTrackingRecordReader

NewTrackingRecordReader(org.apache.hadoop.mapreduce.InputSplit split,
    org.apache.hadoop.mapreduce.InputFormat<K, V> inputFormat,
    TaskReporter reporter,
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
    throws InterruptedException, IOException {
  this.reporter = reporter;
  this.inputRecordCounter = reporter
      .getCounter(TaskCounter.MAP_INPUT_RECORDS);
  this.fileInputByteCounter = reporter
      .getCounter(FileInputFormatCounter.BYTES_READ);

  List <Statistics> matchedStats = null;
  if (split instanceof org.apache.hadoop.mapreduce.lib.input.FileSplit) {
    matchedStats = getFsStatistics(((org.apache.hadoop.mapreduce.lib.input.FileSplit) split)
        .getPath(), taskContext.getConfiguration());
  }
  fsStats = matchedStats;

  long bytesInPrev = getInputBytes(fsStats);
  
  //===========关键代码============
  //真正工作的人是谁,创建一个记录读取器
  //返回的是一个行记录读取器
  this.real = inputFormat.createRecordReader(split, taskContext);
  long bytesInCurr = getInputBytes(fsStats);
  fileInputByteCounter.increment(bytesInCurr - bytesInPrev);
}

org.apache.hadoop.mapreduce.RecordReader

@Override
public RecordReader<LongWritable, Text> 
  createRecordReader(InputSplit split,
                     TaskAttemptContext context) {
  String delimiter = context.getConfiguration().get(
      "textinputformat.record.delimiter");
  byte[] recordDelimiterBytes = null;
  if (null != delimiter)
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  return new LineRecordReader(recordDelimiterBytes);
}

org.apache.hadoop.mapreduce.lib.input.LineRecordReader#initialize

public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
  //获取开始的位置
  start = split.getStart();
  end = start + split.getLength();
  final Path file = split.getPath();

  // open the file and seek to the start of the split
  //获取分布式文件系统
  final FileSystem fs = file.getFileSystem(job);
  //获取一个输入流
  fileIn = fs.open(file);
  
  CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
  if (null!=codec) {
    isCompressedInput = true;  
    decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) {
      final SplitCompressionInputStream cIn =
        ((SplittableCompressionCodec)codec).createInputStream(
          fileIn, decompressor, start, end,
          SplittableCompressionCodec.READ_MODE.BYBLOCK);
      in = new CompressedSplitLineReader(cIn, job,
          this.recordDelimiterBytes);
      start = cIn.getAdjustedStart();
      end = cIn.getAdjustedEnd();
      filePosition = cIn;
    } else {
      in = new SplitLineReader(codec.createInputStream(fileIn,
          decompressor), job, this.recordDelimiterBytes);
      filePosition = fileIn;
    }
  } else {
    //读取偏移量
    fileIn.seek(start);
    in = new UncompressedSplitLineReader(
        fileIn, job, this.recordDelimiterBytes, split.getLength());
    filePosition = fileIn;
  }
  // If this is not the first split, we always throw away first record
  // because we always (except the last split) read one extra line in
  // next() method.
  //解决第二个问题 从第二行开始读,把切片的第一行将给上一个切片去读
  if (start != 0) {
    //返回的start正好是下一行数据的开头
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  }
  this.pos = start;
}

org.apache.hadoop.mapreduce.lib.map.WrappedMapper

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
      return mapContext.nextKeyValue();
    }

org.apache.hadoop.mapreduce.task.MapContextImpl#MapContextImpl

  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    //最终调用的是LineRecordReader中的nextKeyValue方法
    return reader.nextKeyValue();
  }

org.apache.hadoop.mapreduce.lib.input.LineRecordReader

public boolean nextKeyValue() throws IOException {
  if (key == null) {
    key = new LongWritable();
  }
  //设置当前的偏移量
  key.set(pos);
  if (value == null) {
    value = new Text();
  }
  int newSize = 0;
  // We always read one extra line, which lies outside the upper
  // split limit i.e. (end - 1)
  //循环读取数据
  while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
    if (pos == 0) {
      newSize = skipUtfByteOrderMark();
    } else {
      //pos是当前数据的定位,value是数据
      newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
      pos += newSize;
    }

    if ((newSize == 0) || (newSize < maxLineLength)) {
      break;
    }

    // line too long. try again
    LOG.info("Skipped line of size " + newSize + " at pos " + 
             (pos - newSize));
  }
  if (newSize == 0) {
    //如果本次啥也没有读到,返回false
    key = null;
    value = null;
    return false;
  } else {
    //读到了返回true
    return true;
  }
}

//看完这里回到map方法,key就是数据的偏移量,value就是一行数据,context上下文,写到环形缓冲区,map结束

9.3 KV-Buffer

通过context中的write方法进行写

带着问题:

1、分区的数量和reduce数量一样?

2、环形缓冲区内存大小100M?80%溢写?可以自己设置吗?

3、排序是快速排序?

4、怎么分区的?Hash?

org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl

  /**
   * Generate an output key/value pair.
   */
  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    output.write(key, value);
  }

通过参数的个数

org.apache.hadoop.mapred.MapTask.NewOutputCollector#NewOutputCollector

    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      //========关键代码=============
      collector = createSortingCollector(job, reporter);
      partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        //========关键代码============= 分区器
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }

org.apache.hadoop.mapred.MapTask#createSortingCollector

private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
          createSortingCollector(JobConf job, TaskReporter reporter)
    throws IOException, ClassNotFoundException {
    MapOutputCollector.Context context =
      new MapOutputCollector.Context(this, job, reporter);

    //========关键代码==========
    Class<?>[] collectorClasses = job.getClasses(
      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
    int remainingCollectors = collectorClasses.length;
    Exception lastException = null;
    for (Class clazz : collectorClasses) {
      try {
        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
          throw new IOException("Invalid output collector class: " + clazz.getName() +
            " (does not implement MapOutputCollector)");
        }
        Class<? extends MapOutputCollector> subclazz =
          clazz.asSubclass(MapOutputCollector.class);
        LOG.debug("Trying map output collector class: " + subclazz.getName());
        MapOutputCollector<KEY, VALUE> collector =
          ReflectionUtils.newInstance(subclazz, job);
        
        //====================进行初始化===============
        collector.init(context);
        LOG.info("Map output collector class = " + collector.getClass().getName());
          
        //返回MapOutputBuffer
        return collector;
      } catch (Exception e) {
        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
        if (--remainingCollectors > 0) {
          msg += " (" + remainingCollectors + " more collector(s) to try)";
        }
        lastException = e;
        LOG.warn(msg, e);
      }
    }
    throw new IOException("Initialization of all the collectors failed. " +
      "Error in last collector was :" + lastException.getMessage(), lastException);
  }

org.apache.hadoop.mapred.MapTask.MapOutputBuffer#init

 public void init(MapOutputCollector.Context context
                    ) throws IOException, ClassNotFoundException {
      job = context.getJobConf();
      reporter = context.getReporter();
      mapTask = context.getMapTask();
      mapOutputFile = mapTask.getMapOutputFile();
      sortPhase = mapTask.getSortPhase();
      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);
      partitions = job.getNumReduceTasks();
      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

      //sanity checks
      //溢写
      final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
      if (spillper > (float)1.0 || spillper <= (float)0.0) {
        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +
            "\": " + spillper);
      }
      if ((sortmb & 0x7FF) != sortmb) {
        throw new IOException(
            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);
      }
     //默认排序器是快速排序
      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);
      // buffers and accounting
     
      //100左移2位 ×2^20
      int maxMemUsage = sortmb << 20;
      //对16进行取余,让这个数字变成16的整数倍
      maxMemUsage -= maxMemUsage % METASIZE;
      //环形缓冲区100M
      //并且设置环形缓冲区的一些初始值
      kvbuffer = new byte[maxMemUsage];
      bufvoid = kvbuffer.length;
      kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();
      setEquator(0);
      bufstart = bufend = bufindex = equator;
      kvstart = kvend = kvindex;

      //双重索引,大家课下可以自己了解
      maxRec = kvmeta.capacity() / NMETA;
      softLimit = (int)(kvbuffer.length * spillper);
      bufferRemaining = softLimit;
      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);
      LOG.info("soft limit at " + softLimit);
      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

      // k/v serialization
      //如果是自己自定义的类型,需要自定义排序器
      comparator = job.getOutputKeyComparator();
      keyClass = (Class<K>)job.getMapOutputKeyClass();
      valClass = (Class<V>)job.getMapOutputValueClass();
      //
      serializationFactory = new SerializationFactory(job);
      keySerializer = serializationFactory.getSerializer(keyClass);
      keySerializer.open(bb);
      valSerializer = serializationFactory.getSerializer(valClass);
      valSerializer.open(bb);

      // output counters
      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);
      mapOutputRecordCounter =
        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
      fileOutputByteCounter = reporter
          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

      // compression
      if (job.getCompressMapOutput()) {
        Class<? extends CompressionCodec> codecClass =
          job.getMapOutputCompressorClass(DefaultCodec.class);
        codec = ReflectionUtils.newInstance(codecClass, job);
      } else {
        codec = null;
      }

      // combiner
      final Counters.Counter combineInputCounter =
        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
      combinerRunner = CombinerRunner.create(job, getTaskID(), 
                                             combineInputCounter,
                                             reporter, null);
      if (combinerRunner != null) {
        final Counters.Counter combineOutputCounter =
          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);
      } else {
        combineCollector = null;
      }
      spillInProgress = false;
      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
      spillThread.setDaemon(true);
      spillThread.setName("SpillThread");
      spillLock.lock();
      try {
        spillThread.start();
        while (!spillThreadRunning) {
          spillDone.await();
        }
      } catch (InterruptedException e) {
        throw new IOException("Spill thread failed to initialize", e);
      } finally {
        spillLock.unlock();
      }
      if (sortSpillException != null) {
        throw new IOException("Spill thread failed to initialize",
            sortSpillException);
      }
    }

org.apache.hadoop.mapreduce.task.JobContextImpl#getPartitionerClass 默认是hash分区

  @SuppressWarnings("unchecked")
  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }

org.apache.hadoop.mapreduce.lib.partition.HashPartitioner

public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

output–>NewOutputCollector—>write

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }

MapOutputBuffer

    /**
     * Serialize the key, value to intermediate storage.
     * When this method returns, kvindex must refer to sufficient unused
     * storage to store one METADATA.
     */
    public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }
      if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }
      checkSpillException();
      bufferRemaining -= METASIZE;
      if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + METASIZE) % kvbuffer.length !=
                  equator - (equator % METASIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * METASIZE,
                    softLimit - bUsed) - METASIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                //====开始溢写====================
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * METASIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (METASIZE + avgRec) * METASIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * METASIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }
      }

9.4 溢写Spill

output—>NewOutputCollector

  @Override
  public void close(TaskAttemptContext context
                    ) throws IOException,InterruptedException {
    try {
      //===============关键代码===============
      collector.flush();
    } catch (ClassNotFoundException cnf) {
      throw new IOException("can't find class ", cnf);
    }
    collector.close();
  }
}

collector—>MapOutputBuffer

public void flush() throws IOException, ClassNotFoundException,
       InterruptedException {
  LOG.info("Starting flush of map output");
  if (kvbuffer == null) {
    LOG.info("kvbuffer is null. Skipping flush.");
    return;
  }
  spillLock.lock();
  try {
    while (spillInProgress) {
      reporter.progress();
      spillDone.await();
    }
    checkSpillException();

    final int kvbend = 4 * kvend;
    if ((kvbend + METASIZE) % kvbuffer.length !=
        equator - (equator % METASIZE)) {
      // spill finished
      resetSpill();
    }
    if (kvindex != kvend) {
      kvend = (kvindex + NMETA) % kvmeta.capacity();
      bufend = bufmark;
      LOG.info("Spilling map output");
      LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
               "; bufvoid = " + bufvoid);
      LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
               "); kvend = " + kvend + "(" + (kvend * 4) +
               "); length = " + (distanceTo(kvend, kvstart,
                     kvmeta.capacity()) + 1) + "/" + maxRec);
      sortAndSpill();
    }
  } catch (InterruptedException e) {
    throw new IOException("Interrupted while waiting for the writer", e);
  } finally {
    spillLock.unlock();
  }
  assert !spillLock.isHeldByCurrentThread();
  // shut down spill thread and wait for it to exit. Since the preceding
  // ensures that it is finished with its work (and sortAndSpill did not
  // throw), we elect to use an interrupt instead of setting a flag.
  // Spilling simultaneously from this thread while the spill thread
  // finishes its work might be both a useful way to extend this and also
  // sufficient motivation for the latter approach.
  try {
    spillThread.interrupt();
    spillThread.join();
  } catch (InterruptedException e) {
    throw new IOException("Spill failed", e);
  }
  // release sort buffer before the merge
  kvbuffer = null;
  
  //当最后一个数据写出后,开始对溢写的小文件进行合并
  mergeParts();
  Path outputPath = mapOutputFile.getOutputFile();
  fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}

(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入InputSplit中解析出一个个key/value。
(2)Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。
(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。
(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
溢写阶段详情:
步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。
步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。
步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。
(5)Merge阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。
当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。
在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并mapreduce.task.io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

9.5 Reduce

run方法

@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, InterruptedException, ClassNotFoundException {
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

  //reduce的三个阶段
  if (isMapOrReduce()) {
    copyPhase = getProgress().addPhase("copy");
    sortPhase  = getProgress().addPhase("sort");
    reducePhase = getProgress().addPhase("reduce");
  }
  // start thread that will handle communication with parent
  TaskReporter reporter = startReporter(umbilical);
  
  boolean useNewApi = job.getUseNewReducer();
  //初始化信息
  initialize(job, getJobID(), reporter, useNewApi);

  // check if it is a cleanupJobTask
  if (jobCleanup) {
    runJobCleanupTask(umbilical, reporter);
    return;
  }
  if (jobSetup) {
    runJobSetupTask(umbilical, reporter);
    return;
  }
  if (taskCleanup) {
    runTaskCleanupTask(umbilical, reporter);
    return;
  }
  
  // Initialize the codec
  codec = initCodec();
  RawKeyValueIterator rIter = null;
  ShuffleConsumerPlugin shuffleConsumerPlugin = null;
  
  Class combinerClass = conf.getCombinerClass();
  CombineOutputCollector combineCollector = 
    (null != combinerClass) ? 
   new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

  Class<? extends ShuffleConsumerPlugin> clazz =
        job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
         
  shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
  LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

  ShuffleConsumerPlugin.Context shuffleContext = 
    new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical, 
                super.lDirAlloc, reporter, codec, 
                combinerClass, combineCollector, 
                spilledRecordsCounter, reduceCombineInputCounter,
                shuffledMapsCounter,
                reduceShuffleBytes, failedShuffleCounter,
                mergedMapOutputsCounter,
                taskStatus, copyPhase, sortPhase, this,
                mapOutputFile, localMapFiles);
    
  //=================关键代码=========================
  shuffleConsumerPlugin.init(shuffleContext);

  rIter = shuffleConsumerPlugin.run();

  // free up the data structures
  mapOutputFilesOnDisk.clear();
  
  sortPhase.complete();                         // sort is complete
  setPhase(TaskStatus.Phase.REDUCE); 
  statusUpdate(umbilical);
  Class keyClass = job.getMapOutputKeyClass();
  Class valueClass = job.getMapOutputValueClass();
  RawComparator comparator = job.getOutputValueGroupingComparator();

  if (useNewApi) {
    runNewReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  } else {
    runOldReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  }

  shuffleConsumerPlugin.close();
  done(umbilical, reporter);
}

;
reducePhase = getProgress().addPhase(“reduce”);
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical);

boolean useNewApi = job.getUseNewReducer();
//初始化信息
initialize(job, getJobID(), reporter, useNewApi);

// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}

// Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;

Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;

Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);

//=关键代码=========
shuffleConsumerPlugin.init(shuffleContext);

rIter = shuffleConsumerPlugin.run();

// free up the data structures
mapOutputFilesOnDisk.clear();

sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();

if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}

shuffleConsumerPlugin.close();
done(umbilical, reporter);
}




  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-08-06 10:50:33  更:2022-08-06 10:50:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:48:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码