IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Hadoop-MapReduce框架原理 -> 正文阅读

[大数据]Hadoop-MapReduce框架原理

MapReduce框架原理

InputFormat数据输入

切片与MapTask并行度决定机制

MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。

问题:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数据,也启动 8 个 MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因素影响了 MapTask 并行度?(一个MapTask也就是一台服务器)

数据块:Block 是 HDFS 物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行
存储。

并行度决定机制如下,有多少个切片就有多少个MapReduce:

在这里插入图片描述

Job提交流程和切片流程

1、Job提交流程源码如下

waitForCompletion()

submit();
// 以下均为submit中内容
// 1 建立连接,判断是Yarn还是本地
connect();
    // 1.1 创建集群对象,创建提交 Job 的代理
    new Cluster(getConfiguration());
        // 1.1.1 对集群进行初始化,根据运行环境不同来初始化,
        // 一种是本地运行,创建的是本地对象
        // 一种是集群运行,创建的是Yarn对象
        initialize(jobTrackAddr, conf);

// 2 提交job的详细信息
submitter.submitJobInternal(Job.this, cluster)
    // 2.1 校验参数,主要是校验输出路径
    checkSpecs(job);
    // 2.2 创建给集群提交数据的Stag路径
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    // 2.3 获取jobid ,并创建Job路径
    JobID jobId = submitClient.getNewJobID();
    // 2.4 拷贝jar包到集群
    copyAndConfigureFiles(job, submitJobDir);
		// 2.4.1 提交文件
    	rUploader.uploadFiles(job, jobSubmitDir);
    // 2.5 计算切片,生成切片规划文件到上面的Stag路径中
    writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);
    // 2.6 向Stag路径写 XML 配置文件
    writeConf(conf, submitJobFile);
		// 2.6.1 写XML文件到Stag路径下,文件中是各种属性参数的值,包括job运行所有的配置信息
    	conf.writeXml(out);
    //2.7 提交 Job,返回提交状态,并删除Stag路径下的文件
    status = ubmitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

在这里插入图片描述

本地运行和Yarn上运行不同的区别是,本地运行不需要提交jar包,而Yarn上运行是需要的

FileInputFormat切片流程如下:

在这里插入图片描述

默认情况下切片最小值(minSize)是1,默认最大(maxSize)是Long的最大值;

如果是放在本地运行,那么块大小是32M,在Yarn上运行快大小是128M。

切片大小计算公式为

Math.max(minSize, Math.min(maxSize, block.size));

切片时,是一个一个文件遍历去计算和操作的。

比如129M的文件,存的时候存2块,切的时候只切1片。

FileInputFormat切片机制(input.getSplits(job))

在这里插入图片描述

在这里插入图片描述

CombineTextInputFormat切片机制

框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。

CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。因此CombineTextInputFormat切片可用于小文件处理。

虚拟存储切片最大值设置如下:

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4m

注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。

切片机制如下:

生成切片过程包括:虚拟存储过程和切片过程二部分。

1、虚拟存储过程:

将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时将文件均分成 2 个虚拟存储块(防止出现太小切片)。

例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。

2、切片过程:

(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片;

(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片;’

测试举例:有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:

1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)

最终会形成 3 个切片,大小分别为:

(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M

实操:

目的:输入4个小文件合并成一个切片统一处理;

在这里插入图片描述

实现过程:

1、不做任何处理运行WordCount案例,观察切片个数为4;

在这里插入图片描述

2、在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 3。

驱动类中添加代码如下:

// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);

//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

运行如果为 3 个切片。

在这里插入图片描述

3、在 WordcountDriver 中增加如下代码,运行程序,并观察运行的切片个数为 1。

驱动中添加代码如下:

// 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 20m
CombineTextInputFormat.setMaxInputSplitSize(job, 20971520);

运行如果为 1 个切片。

在这里插入图片描述

FileInputFormat实现类

在这里插入图片描述

TextInputFormat:处理文本

KeyValueInputFormat:处理KV对数据的

NLineInputFormat:按行处理的

CombineTextInputFormat:处理小文件的

在这里插入图片描述

就可以通过实例来对比,一下是在Mapper中打印的key的情况,与原有数据的对比

// 打印如下
System.out.println(key.toString() + ", " + value.toString());

在这里插入图片描述

在这里插入图片描述

实操:

目的:统计输入文件中每一行第一个单词相同的行数

输入数据:

在这里插入图片描述

输出数据:

caocao 2
caozhi 1
liubei 2

Map阶段

1、读取数据:caocao shi wei wu di

2、设置key和value:<caocao, 1>

3、写出

Reduce阶段

1、读取数据:<caocao, 1>、<caocao, 1>

2、汇总:<caocao, 2>

Driver处理:

1、设置切割符,默认是\t及Tab分割

conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");

2、设置输入格式,默认是TextInputFormat

job.setInputFormatClass(KeyValueTextInputFormat.class);

Mapper代码如下:

/**
 * map阶段
 * 1、KEYIN 输入数据的key
 * 2、VALUEIN 输入数据的value
 * 3、KEYOUT 输出数据的key
 * 4、VALUEOUT 输出数据的value
 */
public class KVTextMapper extends Mapper<Text, Text, Text, IntWritable> {

    IntWritable v = new IntWritable(1);

    @Override
    protected void map (Text key, Text value, Context context) throws IOException, InterruptedException {

        // caocao shi wei wu di
        System.out.println(key.toString() + ", " + value.toString());

        context.write(key, v);
    }
}

Reducer代码

/**
 * reduce
 * 1、KEYIN 输入数据的key
 * 2、VALUEIN 输入数据的value
 * 3、KEYOUT 输出数据的key
 * 4、VALUEOUT 输出数据的value
 */
public class KVTextReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    IntWritable v = new IntWritable();

    @Override
    protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        // <caocao,1>
        // <caocao,1>
        // 会按照String的顺序进行排序

        // 1、累加求和
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        // 2、写出
        v.set(sum);
        context.write(key, v);
    }
}

Driver代码如下

public class KVTextDriver {

    public static void main (String[] args) throws Exception {

        args = new String[]{"/home/lxj/hadoop-data/input/KVText", "/home/lxj/hadoop-data/output/KVText"};

        // 1、获取Job对象
        Configuration conf = new Configuration();
        // 设置分隔符,要放在job实例化前面
        conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");
        Job job = Job.getInstance(conf);

        // 2、设置jar文件存储位置,即驱动类的路径
        job.setJarByClass(KVTextDriver.class);

        // 3、关联Map和Reduce类
        job.setMapperClass(KVTextMapper.class);
        job.setReducerClass(KVTextReducer.class);

        // 4、设置Mapper阶段输出数据的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5、设置最终数据输出的key和value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置InputFormat
        job.setInputFormatClass(KeyValueTextInputFormat.class);

        // 6、设置程序输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7、提交Job对象
//        job.submit();
        job.waitForCompletion(true);
    }
}

Mapper读取的数据如下:

在这里插入图片描述

最终的运行结果如下;

在这里插入图片描述
在这里插入图片描述

实操:

对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少切片,本次使用3行作为每个切片的大小

输入数据:

caocao shi wei wu di
liubei shi shu zhao lie di
liubei shi shu guo de
caocao shi wei guo de 
caozhi shi caocao de er zi
liushan shi liu bei de er zi
caozhi ye shi caocao de er zi

输出结果为:3片

Map阶段:

1、获取一行数据;

2、切割;

3、循环写出;

Reduce阶段:

1、汇总;

2、写出;

Driver:

1、设置InputFormat类型

2、设置NlineInoutFormat为3行一切片

job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 3);

结果如下:

在这里插入图片描述

bei	1
caocao	4
caozhi	2
de	5
di	2
er	3
guo	2
lie	1
liu	1
liubei	2
liushan	1
shi	7
shu	2
wei	2
wu	1
ye	1
zhao	1
zi	3

在这里插入图片描述

案例实操:

无论 HDFS 还是 MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景,此时,就需要有相应解决方案。可以自定义 InputFormat 实现小文件的合并。合并的文件,在之后要处理时,启动job处理只需要处理合并之后的文件就可以了

目的:

将多个小文件合并成一个 SequenceFile 文件(SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对的文件格式),SequenceFile 里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为 value。

输入数据:

wei.txt

caocao shi wei wu di
caocao shi wei guo de 
caozhi shi caocao de er zi
caozhi ye shi caocao de er zi

shu.txt

liubei shi shu zhao lie di
liubei shi shu guo de
liushan shi liu bei de er zi

wu.txt

sunquan shi wu da di
sunce shi sunquan ta ge
sunjian shi ta men de ba

输出一个SequceFile文件,它是一个二进制顺序文件。由于它的格式紧凑,很容易被压缩,因此如果输出需要作为后续的MapReduce任务的输入,这便是一个很好的输出格式。

在这里插入图片描述

InputFormat继承类

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader (InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {

        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(inputSplit, context);

        return recordReader;
    }
}

ReadRecord继承类,ReadRecord决定了本次处理怎么读取数据内容

public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

    FileSplit split;
    Configuration configuration;
    Text key = new Text();
    BytesWritable value = new BytesWritable();
    boolean isProgress = true;

    @Override
    public void initialize (InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {

        // 初始化
        this.split = (FileSplit) inputSplit;
        this.configuration = context.getConfiguration();
    }

    /**
     * 在map阶段的run方法中会循环调用本方法
     */
    @Override
    public boolean nextKeyValue () throws IOException, InterruptedException {
        // 核心业务逻辑处理

        if (isProgress) {
            // 1、获取fs对象;
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(configuration);

            // 2、获取输入流
            FSDataInputStream fis = fs.open(path);

            // 3、拷贝
            byte[] buf = new byte[(int) split.getLength()];
            IOUtils.readFully(fis, buf, 0, buf.length);

            // 4、封装value
            value.set(buf, 0, buf.length);

            // 5、封装key
            key.set(path.toString());

            // 6、关闭资源
            IOUtils.closeStream(fs);

            isProgress = false;

            return true;
        }

        return false;
    }

    @Override
    public Text getCurrentKey () throws IOException, InterruptedException {
        
        return key;
    }

    @Override
    public BytesWritable getCurrentValue () throws IOException, InterruptedException {
        
        return value;
    }

    
    
    @Override
    public float getProgress () throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close () throws IOException {

    }
}


Mapper和Reducer就是对读取的数据进行写出或者循环写出。

Driver中关键之处在于

// 设置InputFormat
job.setInputFormatClass(WholeFileInputFormat.class);
// 设置OutputFormat
// SequenceFileOutputFormat将他的输出写成SequenceFile文件
job.setOutputFormatClass(SequenceFileOutputFormat.class);

结果如下:

在这里插入图片描述

执行过程是先执行InputFormat中的nextKeyValue(),再执行map(),然后循环执行直到 nextKeyValue返回false。
在这里插入图片描述

接下来执行reduce()。

总结一下FileInoutFormat实现类:

TextInputFormat:按照块的大小进行切片;Key为行数据偏移量;Value为行数据;

KeyValueTextInputFormat:按照块的大小进行切片;Key为根据分隔符分割之后的第一列;Value为为根据分隔符分割之后的第一列之后的数据;

NLineInputFormat:按行进行切片;Key为行数据偏移量;Value为行数据;

CombineTextInputFormat:按照设置的最大值进行切片;Key为行数据偏移量;Value为行数据;

上面的自定义InputFormat:按照块的块大小进行切片;Key为文件绝对路径;Value为文件内容;(这个可以根据需求去进行自定义)

MapReduce工作流程

在这里插入图片描述

上面的流程是整个 MapReduce 最全工作流程,但是 Shuffle 过程只是从第 7 步开始到第16 步结束,具体 Shuffle 过程详解,如下:

1、MapTask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中

2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3、多个溢出文件会被合并成大的溢出文件

4、在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序

5、ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据

6、ReduceTask 会取到同一个分区的来自不同 MapTask 的结果文件(如果大文件则存在磁盘,如果是小文件则直接在内存中处理),ReduceTask 会将这些文件再进行合并(归并排序)

7、合并成大文件后,Shuffle 的过程也就结束了,后面进入 ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对 Group(相同key的分在一组,每次取一组),调用用户自定义的 reduce()方法)

备注:

a、环形缓冲区:默认100M,存在于内存中,当其中保存的数据达到80%的时候,在空闲位置的中间开始写入数据(此时是反向写入数据),并将之前保存在内存中的数据输出到磁盘中;环形缓冲区中将会保存KV数据以及其对应的索引信息。

b、索引信息包括:key开始位置、value开始位置、分区信息、以及索引。

c、先分区,后排序,在分区内部进行排序。排序是用key进行字典顺序排序,排序方法是快速排序

d、启动ReduceTask的数量由分区决定,多少个分区就有多少个ReduceTask。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-07 11:46:23  更:2021-07-07 11:47:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 20:22:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码