IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce知识记录 -> 正文阅读

[大数据]MapReduce知识记录

目录

MapReduce设计理念:

MapReduce的计算流程:

数据块Block:

切片Split:

MapTask:

环形数据缓冲区,KvBuffer:

分区Partation:

排序Sort:

溢写Spill:

合并Merge:

组合器Combiner:

拉取Fetch:

合并Merge:

归并Reduce:

写出到Output:

MapReduce的架构特点:

MapReduce1.X:

Client:

JobTracker:

TaskTracker:

Slot(槽):

Task(MapTask-->ReduceTask):

缺点:

MapReduce2.X:

Client:

ResourceManager:

Yarn(NodeManager):

Container:

ApplicationMaster:

Task(MapTask--ReduceTask):

额外补充点:


MapReduce设计理念:

映射map与归纳reduce。

mapreduce必须构建在hdfs之上的一种大数据离线计算框架。mapreduce不会马上得到结果,有一定的延时,适合大数据量。

原始数据--->map(Key,Value)--->Reduce

特点:

????????分布式计算,将大的数据分成很多小数据,交给更多的节点参与运算。

????????计算想数据靠拢,将计算传递给有数据的节点上进行工作。

MapReduce的计算流程:

数据块Block:

hdfs上数据存储的一个单元,同一个文件中块的大小都是相同的。

因为数据存储到hdfs上不可变,所以有可能块的数量和集群的计算能力不匹配。

我们需要一个动态调整本次参与计算几点数量的单位。

切片Split:

是一个逻辑概念,在不改变现在数据存储的情况下,可以控制参与计算的节点数目,通过切片大小可以达到控制计算节点数量的目的(有多少个切片就会有多少个Map任务),一般切片大小为Block的整数倍,这样是防止多余创建和很多的数据连接。

如果Split大小>Block大小,计算节点就少了,如果Split大小<Block大小,计算节点就多了。默认情况下,Split切片的大小等于Block的大小,默认是128M。

MapTask:

map默认从所属切片读取数据,每次读取一行(默认读取器)到内存中。也可以更具自己书写的分词逻辑(例如空格)计算每个单词出现的次数。这时就会产生Map(<String,Integer>)临时数据,存放到内存中。

但是内存的大小是有限度的,如果多个任务同时执行,有可能内存溢出。但是如果吧数据都直接存放在硬盘,效率太低,因此一般先往内存中写一部分,达到阈值再写出到硬盘。

环形数据缓冲区,KvBuffer:

可以循环利用这块内存区域,减少数据溢写时map的停止时间。

????????每一个Map可以独享一个内存区域。

????????在内存中构建一个环形数据缓冲区,默认为100M。

????????设置缓冲区的阈值为80%,当缓冲区的数据达到80M时,开始往外溢写到硬盘,设立阈值是为了同时写入和写出没有间隔。

????????溢写的时候还有20M的空间可以被试用,效率并不会被减缓。

????????而且将数据循环写到硬盘,不用担心OOM问题。

分区Partation:

根据Key直接计算出对应的Reduce。

分区的数量和Reduce的数量是相等的。hash(key) % partation = num

默认分区的算法是Hash,然后取余:

????????Object的hashCode()---equals()

????????如果两个对象equals,那么两个对象的hashcode一定相等。

????????如果两个对象的hashcode相等,但是两个对象一定equals。

排序Sort:

对要溢写的数据进行排序(QuickSort)。

按照先Partation然后Key的顺序排序,相同的分区在一起,相同的Key在一起。

将来溢写的小文件也是有序的。

溢写Spill:

将内存中的数据循环写到硬盘,不用担心OOM问题。

每次都会产生一个80M的文件。

如果本次Map产生的数据较多,有可能会溢写多个文件。

合并Merge:

因为溢写会产生很多有序(分区key)的小文件,而且小文件的数目不确定。后面向reduce传递数据会带来一些问题,所以将小文件合并成一个大文件,将来拉去的数据直接从大文件拉取即可。

合并小文件的同时会进行排序(归并排序),最终产生一个有序的大文件。

组合器Combiner:

集群的带宽限制了mapreduce作业的数量,因此应该尽量避免map和reduce任务之间的数据传输,hadoop允许用户对map的输出数据进行处理,用户可自定义combiner函数(如同map函数和reduce函数一般),其逻辑一般和reduce函数一样,combiner的输入是map的输出,combiner的输出作为reduce的输入,很多情况下可以i直接将reduce函数作为conbiner函数来试用(job.setCombinerClass(FlowCountReducer.class))。

combiner属于优化方案,所以无法确定combiner函数会调用多少次,可以在环形缓存区溢出文件时调用combiner函数,也可以在溢出的小文件合并成大文件时调用combiner,但是要保证不管调用多少次,combiner函数都不影响最终的结果,所以不是所有处理逻辑都可以i使用combiner组件,有些逻辑如果试用了conbiner函数会改变最后reduce的输出结果(如求几个数的平均值,就不能先用conbiner求一次各个map输出结果的平均值,再求这些平均值的平均值,那样会导致结果的错误)。

combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量:

????????原先传给reduce的数据时a1 a1 a1 a1 a1

????????第一次combiner组合后变成a(1,1,1,1,1)

????????第二次combiner后传给reduce的数据变为a(5,5,6,7,23,...)

拉取Fetch:

我们要将Map的临时结果拉去到Reduce节点。

原则:

????????相同的Key必须拉去到同一个Reduce节点,但是同一个Reduce节点可以有多个Key。

拉去数据的时候必须对Map产生的最终合并文件做全序遍历,而且每一个reduce都要做一个全序遍历,如果map产生的大文件时有序的,每一个reduce只需要从文件中读取自己所需的即可。

合并Merge:

因为reduce拉去的时候,会从多个map拉取数据。

每个map都会产生一个小文件,这些小文件之间无序,文件内部有序。

为何方便计算(没必要读取N个小文件),需要合并文件。

归并算法合并成1个。

相同的key都在一起。

归并Reduce:

将文件中的数据读取到内存中,一次性将相同的key全部读取到内存中,直接将相同的key得到结果--->最终结果。

写出到Output:

每个reduce将自己计算的最终结果都存放在HDFS上。

MapReduce的架构特点:

MapReduce1.X:

Client:

客户端发送MR任务到集群,其中客户端有很多种类,例如hadoop jar

JobTracker:

接收客户端的MR任务。然后选择一个资源丰富的节点,给这个任务分配资源,执行对应的任务。与TaskTracker保持心跳,接收汇报信息。

TaskTracker:

保持心跳,汇报资源(当前机器内存,当前机器任务数)。

当分配资源了后,开始在本机分配对应的资源给Task。并且实时监控任务的执行并汇报。

Slot(槽):

属于JobTracker分配的资源(计算能力、IO能力等)。

不管任务大小,资源是恒定的,不灵活但是好管理。

Task(MapTask-->ReduceTask):

开始按照MR的流程执行业务。

当任务完成时,JobTracker告诉TaskTracker回收资源。

缺点:

单点故障

内存扩展

业务瓶颈

只能执行MR操作

如果其他框架需要运行在Hadoop上,需要独立开发自己的资源调度框架。

MapReduce2.X:

Client:

客户端发送MR任务到集群,其中客户端有很多种类,例如hadoop jar

ResourceManager:

资源协调框架的管理者,分为主节点和备用节点(防止单点故障,主备的切换基于ZK的管理),它时刻与NodeManager保持心跳,接受NodeManager的汇报(NodeManager当前节点的资源情况)。

当有外部框架要使用资源的时候直接访问ResourceManager即可。

如果是有MR任务,先去ResourceManager申请资源,ResourceManager根据汇报分配资源,例如资源在NodeManager1,那么NodeManager1要负责开辟资源。

Yarn(NodeManager):

Yarn(Yet Another Resource Negotiator,另一种资源协调者),统一管理资源。以后其他的计算框架可以直接访问yarn获取当前集群的空闲节点。

每个DataNode上默认有一个NodeManager,NodeManager汇报自己的信息到ResourceManager。

Container:

它是动态分配的,2.X资源的代名词。

ApplicationMaster:

我们本次任务的主导者,负责调度本次被分配的资源Container。当所有的节点任务全部完成,applicaion告诉ResourceManager请求杀死当前ApplicationMaster线程,本次任务的所有资源都会被释放。

Task(MapTask--ReduceTask):

开始按照MR的流程执行业务,当任务完成时,ApplicationMaster接收当前节点的反馈。

额外补充点:

MR中Map Task的工作机制:

简单概述:

inputFile通过split被切割为多个split文件,通过Record按行读取内容给map(自己写的处理逻辑的方法) ,数据被map处理完之后交给OutputCollect收集器,对其结果key进行分区(默认使用的hashPartitioner),然后写入buffer,每个map task 都有一个内存缓冲区(环形缓冲区),存放着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式溢写到磁盘,当整个map task 结束后再对磁盘中这个maptask产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task的拉取。

详细步骤:

1、读取数据组件 InputFormat (默认 TextInputFormat) 会通过 getSplits 方法对输入目录中的文件进行逻辑切片规划得到 block,有多少个 block就对应启动多少个 MapTask。

2、将输入文件切分为 block 之后,由 RecordReader 对象 (默认是LineRecordReader) 进行读取,以 \n 作为分隔符, 读取一行数据, 返回 <key,value>, Key 表示每行首字符偏移值,Value 表示这一行文本内容。

3、读取 block 返回 <key,value>, 进入用户自己继承的 Mapper 类中,执行用户重写的 map 函数,RecordReader 读取一行这里调用一次。

4、Mapper 逻辑结束之后,将 Mapper 的每条结果通过 context.write 进行collect数据收集。在 collect 中,会先对其进行分区处理,默认使用 HashPartitioner。

5、接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区(默认100M),缓冲区的作用是 批量收集 Mapper 结果,减少磁盘 IO 的影响。我们的 Key/Value 对以及 Partition 的结果都会被写入缓冲区。当然,写入之前,Key 与 Value 值都会被序列化成字节数组。

6、当环形缓冲区的数据达到溢写比列(默认0.8),也就是80M时,溢写线程启动,需要对这 80MB 空间内的 Key 做排序 (Sort)。排序是 MapReduce 模型默认的行为,这里的排序也是对序列化的字节做的排序。

7、合并溢写文件,每次溢写会在磁盘上生成一个临时文件 (写之前判断是否有 Combiner),如果 Mapper 的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行 Merge 合并,因为最终的文件只有一个写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

MR中Reduce Task的工作机制:

简单描述:

Reduce 大致分为 copy、sort、reduce 三个阶段,重点在前两个阶段。

copy 阶段包含一个 eventFetcher 来获取已完成的 map 列表,由 Fetcher 线程去 copy 数据,在此过程中会启动两个 merge 线程,分别为 inMemoryMerger 和 onDiskMerger,分别将内存中的数据 merge 到磁盘和将磁盘中的数据进行 merge。待数据 copy 完成之后,copy 阶段就完成了。

开始进行 sort 阶段,sort 阶段主要是执行 finalMerge 操作,纯粹的 sort 阶段,完成之后就是 reduce 阶段,调用用户定义的 reduce 函数进行处理。

详细步骤:

1、Copy阶段:简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件(map task 的分区会标识每个map task属于哪个reduce task ,默认reduce task的标识从0开始)。

2、Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

(merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就直接启动内存到磁盘的merge。与map端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。内存到磁盘的merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。)。

3、合并排序:把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

4、对排序后的键值对调用reduce方法:键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

MR中的Shuffle阶段简述:

shuffle阶段分为四个步骤:依次为:分区,排序,规约,分组,其中前三个步骤在map阶段完成,最后一个步骤在reduce阶段完成。

shuffle 是 Mapreduce 的核心,它分布在 Mapreduce 的 map 阶段和 reduce 阶段。一般把从 Map 产生输出开始到 Reduce 取得数据作为输入之前的过程称作 shuffle。Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,? 参数:mapreduce.task.io.sort.mb? 默认100M

1、Collect阶段:将 MapTask 的结果输出到默认大小为 100M 的环形缓冲区,保存的是 key/value,Partition 分区信息等。

2、Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了 combiner,还会将有相同分区号和 key 的数据进行排序。

3、MapTask阶段的Merge:把所有溢出的临时文件进行一次合并操作,以确保一个 MapTask 最终只产生一个中间数据文件。

4、Copy阶段:ReduceTask 启动 Fetcher 线程到已经完成 MapTask 的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。

5、ReduceTask阶段的Merge:在 ReduceTask 远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。

6、Sort阶段:在对数据进行合并的同时,会进行排序操作,由于 MapTask 阶段已经对数据进行了局部的排序,ReduceTask 只需保证 Copy 的数据的最终整体有效性即可。

Shuffle阶段的数据压缩机制:

在shuffle阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多。

hadoop当中支持的压缩算法:

gzip、bzip2、LZO、LZ4、Snappy,这几种压缩算法综合压缩和解压缩的速率,谷歌的Snappy是最优的,一般都选择Snappy压缩。谷歌出品,必属精品。

写MR时,什么情况下可以使用规约:

规约(combiner)是不能够影响任务的运行结果的局部汇总,适用于求和类,不适用于求平均值,如果reduce的输入参数类型和输出参数的类型是一样的,则规约的类可以使用reduce类,只需要在驱动类中指明规约的类即可。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-06 13:53:32  更:2022-02-06 13:55:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:41:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码