IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce的运作详解之分区 -> 正文阅读

[大数据]MapReduce的运作详解之分区

分区的概念:

????????Map阶段处理的数据,在向环形缓冲区写的时候 是以分区的方式写的。 一般情况下,MR程序分区数有多少 reduceTask数量就应该有多少 ,一个分区的数据一个reduceTask去处理,reduceTask处理完成之后都会生成一个结果文件

举个例子:

mapper阶段输出的分区是5个 但是reduceTask数量是1,能否将MR运行成功呢?

可以运行成功 只有一个文件 五个分区 但是一个reduceTask可以去处理五个分区数据

????????那么我们衍生出这么几个问题:

若分区5个 但是reduceTask是2个, 则不能运行 运行报错 不患寡而患不均

若分区5个 reduceTask5个 ,百分百可以正常运行 ----最理想的状态

若分区5个 reduceTask大于6个 ,百分百也可以运行成功,但是会多出一个空白结果文件

【注意】:以后在工作中写的 我们的reduceTask数量最好和分区数保持一致 这样的话处理才是MR程序认位的最佳状态

一、MR的默认分区(不需要自定义)

public class HashPartitioner<K, V> extends Partitioner<K, V> {
  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}

????????默认分区是根据key的hashCode对reduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。默认类为HashPartitioner。

使用默认分区需要在Driver类中调用语句:

job.setNumReduceTasks(3);

二、自定义分区

??????? 默认分区机制HashPartitioner有个缺点,那就是不可控(不好控制把结果文件输出到哪里),所以我们需要自定义分区。

接下来我们做一个案例:

需求:将统计结果按照手机归属地不同省份输出到不同文件中(Partitioner)

文本数据:

?

分析

1Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据keyhashcode%reducetask的数值来分发

2)如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner

自定义一个CustomPartitioner继承抽象类:Partitioner

3)在job驱动中,设置自定义

partitioner job.setPartitionerClass(CustomPartitioner.class)

* 默认分区机制----5个区---设置5个reduceTask, 同时默认的分区机制是按照key的hashcode值

public class Driver {
    public static void main(String[] args) throws Exception {
        //定义配置项以及用来获取Job任务对象
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://192.168.100.3:9000");
        Job job = Job.getInstance(conf);
        //job关联Mapper reducer阶段
        job.setJarByClass(Driver.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        //定义我们的输入的数据路径以及输出的数据路径 以及输入的实现类
        FileInputFormat.setInputPaths(job, new Path("/phone_data.txt"));
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.10.3:9000"), conf, "root");
        Path outPath = new Path("/output3");
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        //定义不去使用默认的HashPartitioner分区  而是使用我们的自定义分区
        job.setPartitionerClass(MyPartitioner.class);
        //你的reduceTask数量必须是5
        /**
         * 你的reduceTask数量必须是5 原因就是在MR程序中一般【默认情况下】是一个分区要有一个reduceTask专门去处理
         *但是在有些情况下 reduceTask我们可能少些或者多些  这样的话会出一下奇怪的问题
         *    mapper阶段输出的分区是5个  但是reduceTask数量是1      可以运行成功 只有一个文件  五个分区 但是一个reduceTask可以去处理五个分区数据
         *                  分区5个    但是reduceTask是2个        不能运行 运行报错  不患寡而患不均
         *                  分区5个    reduceTask5个            百分百可以正常运行 ----最理想的状态
         *                  分区5个    reduceTask大于6个        百分百也可以运行成功,但是会多出一个空白结果文件
         *【注意】:以后在工作中写的  我们的reduceTask数量最好和分区数保持一致 这样的话处理才是MR程序认位的最佳状态
         */
        job.setNumReduceTasks(3);

        //提交运行
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}

/**
 * 把手机号的前三位当key  整条数据当作value输出到reducetask阶段
 */
class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");
        //13666666666
        String phone = fields[1];
        //拿到手机号的前三位
        String phoneThree = phone.substring(0, 3);
        context.write(new Text(phoneThree), value);
    }
}

class MyReducer extends Reducer<Text, Text, NullWritable, Text> {
    /**
     * 136   【1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	       24	27	2481	24681	200
     * ,1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	       24	27	2481	24681	200】
     *
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //第一个Text代表手机号
        Iterator<Text> iterator = values.iterator();
        while (iterator.hasNext()) {
            context.write(NullWritable.get(), iterator.next());
        }
    }
}

将文件中的手机号按照归属地不同放到不同的文件中

* 136----分区1

* 137----分区2

* 138----分区3

* 139----分区4

* 其他----分区5

必须使用自定义分区:

自定义分区机制
1、继承Partitioner这个类
2、重写里面的getPartition方法 返回值是一个int类型 返回值就是我的分区
?
??????? 继承Partitioner之后 需要区传递一个key-value键值对的泛型 代表的是我们的数据
????????那么需要传递的是map阶段输出的key-value类型 因为分区是在map阶段执行结束输出数据的时候执行的

/**
 * 自定义分区机制
 *    1、继承我们的Partitioner这个类
 *    2、重写里面的getPartition方法 返回值是一个int类型 返回值就是我的分区
 *
 *继承Partitioner之后 需要区传递一个key-value键值对的泛型 代表的是我们的数据
 * 那么需要传递的是map阶段输出的key-value类型 因为分区是在map阶段执行结束输出数据的时候执行的
 */
public class MyPartitioner extends Partitioner<Text,Text> {

    /**
     *
     * @param key  map阶段输出的key值
     * @param value  map阶段输出的value值
     * @param numReduceTasks 定义的reduceTask的任务数据 默认是1
     * @return 数字  代表的是我要将当前的这条key-value数据输送到哪个分区?
     */
    @Override
    public int getPartition(Text key, Text value, int numReduceTasks) {
        String s = key.toString();
        switch(s){
            case "136":
                return 0;
            case "137":
                return 1;
            case "138":
                return 2;
            case "139":
                return 3;
            default:
                return 4;
        }
    }
}

接下来我们在上面的Driver类里需要添加如下代码:

        job.setPartitionerClass(MyPartitioner.class);
        
        job.setNumReduceTasks(5);

注意:

????????如果reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx

????????如果1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception

????????如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-08 11:24:53  更:2021-08-08 11:26:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 17:17:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码