IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce-day04-第三章框架原理-3.3Shuffle机制 -> 正文阅读

[大数据]MapReduce-day04-第三章框架原理-3.3Shuffle机制

1:shuffle机制

shuffle是处于Map方法之后,Reduce方法之前的数据处理过程。

?2:Partition分区

? ? ? ? 1:要求将统计结果按照条件输出到不同的文件中(分区)。将统计结果按照手机归属地不同省份输出到不同文件中(分区)。

? ? ? ? 2:默认的Partition分区

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        // 相当于key的hashcode值与numReduceTasks取余得到。
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

? ? ? ? 3:自定义Partitioner步骤

# 1.自定义类继承Partitioner,重写getPartition方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
    @Override
        public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 控制分区代码逻辑
        … …
        return partition;
    }
}

# 2.在job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);

# 3.自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);

3:自定义分区案例

? ? ? ? 1:将统计结果按照手机归属地不同省份输出到不同文件中(分区)

????????2:输入的数据phone_data.txt

????????3:期望输出数据:136、137、138、139开头都放到一个独立的4个文件中,其他开头的放在一个文件中。

? ? ? ? 4:136对应分区0,137对应分区1,138对应分区2,139对应分区3,其他对应分区4????????

? ? ? ? 5:Driver类中,// 指定自定义数据分区 job.setPartitionerClass(ProvincePartitioner. class);

??????????????????????????????????// 同时指定相应数量的reduceTask job.setNumReduceTasks(5);

? ? ? ? 6:代码

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
        //获取手机号前三位 prePhone
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        //定义一个分区号变量 partition,根据 prePhone 设置分区号
        int partition;
        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }
        // 上卖弄partition参数的设置,需要连续,不能有空
        //最后返回分区号 partition
        return partition;
    }
}

# 在driver类中添加如下代码
//8 指定自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);
//9 同时指定相应数量的 ReduceTask
 // 这个数值的设置,设置2-4都会报IO异常,设置5走自定义类,设置6会多出来一个空文件,设置1只有会有个分区文件
job.setNumReduceTasks(5);

4:排序

? ? ? ? MapTask:一次快排,在环形缓冲区溢写前? ,对key的索引按照字典的顺序(a、b、c...)进行排序,进行分区内的快排,一次归并,对溢写文件进行归并,溢写文件是区内有序,因为快排,归并排序是把所有溢写文件中同属于一个分区的所有数据进行排序,使得整个分区一内排好顺序。

? ? ? ? ReduceTask:一次归并排序。例如Reduce Task1只负责partition0内的数据,但是MapTask1和MapTask2都有partition0的数据,所以ReduceTask1会主动把这两个MapTask内的partition0数据进行拉取,然后进行归并排序,排成只有一个文件。?

? ? ? ? 一定要进行排序,因为最后到Reducer的时候,是相同key为一组进行操作的,事先排好序的话,效率会比较高。?

? ? ? ? 1:部分排序。MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序

? ? ? ? 2:全排序。最终输出结果只有一个文件,且文件内部有序。

? ? ? ? 3:二次排序。在自定义排序中,如果compareTo中判断条件为两个即为二次排序。

5:全排序案例以及二次排序案例(使用之前的序列化案例,二次排序是在总流量相同的情况下,设置上行流量按照升序的方式排列,在compareTo方法里面设置就好)

# 根据手机总流量进行降序
# 输入数据,既然对流量进行排序,就把流量认为key
137    2481    24681    27162
138    264     0        264
139    132     1512     1644
135    7355    110349   117684

# 输出数据
135    7355    110349    117684
137    2481    24681     27162
139    132     1512      1644
138    264     0         264   

# FlowBean实现WritableCompareable接口重写compareTo方法
# public class FlowBean implements WritableComparable<FlowBean>
@Override
public int compareTo(FlowBean o) {
    // 倒序排列,按照总流量从大到小
    if(this.sumFlow > o.getSunFlow){
        return -1;
    }else if(this.sunFlow < o.sumFlow){
        return 1;
    }else{
        //按照上行流量的正序
        if(this.upFlow > o.upFlow){
            return 1
        }else if(this.upFlow < o.upFlow){
            return -1;
        }else{
            return 0;
        }
    }
    
}

# Mapper,注意FlowBean类,mapper,reducer,driver的参数进行改变,因为key value变了
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    private FlowBean outK = new FlowBean();
    private Text outV = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException {
        //1 获取一行数据
        String line = value.toString();
        //2 按照"\t",切割数据
        String[] split = line.split("\t");
        //3 封装 outK outV
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);
        //4 写出 outK outV
        context.write(outK,outV);
    }
}
# Reducer类
// 循环输出,避免总流量相同情况
public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context 
context) throws IOException, InterruptedException {
    //遍历 values 集合,循环写出,避免总流量相同的情况
    for (Text value : values) {
        //调换 KV 位置,反向写出
        context.write(value,key);
    }
    }
}

#driver类
//4 修改设置 Map 端输出数据的 KV 类型
 job.setMapOutputKeyClass(FlowBean.class);
 job.setMapOutputValueClass(Text.class);

6:区排序(分区加排序的共同使用,分区类也要注意key和value是什么)

# 根据上面案例的输出情况,把136、137、138等等开头的手机号放在不同的文件夹中,同时各个文件中也 # 按照总流量进行排序,这样其实就是分区加排序的综合使用

# 分区类
public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {
    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
        //获取手机号前三位
        String phone = text.toString();
        String prePhone = phone.substring(0, 3);
        //定义一个分区号变量 partition,根据 prePhone 设置分区号
        int partition;
        if("136".equals(prePhone)){
            partition = 0;
        }else if("137".equals(prePhone)){
            partition = 1;
        }else if("138".equals(prePhone)){
            partition = 2;
        }else if("139".equals(prePhone)){
            partition = 3;
        }else {
            partition = 4;
        }
        //最后返回分区号 partition
        return partition;
    }
}

# 在driver类添加,进行绑定
job.setPartitionerClass(ProvincePartitioner2.class);
// 设置对应的 ReduceTask 的个数
job.setNumReduceTasks(5);

7:Combiner合并

? ? ? ? 1:Combiner可有可无,但是为了考虑效率问题,一般需要使用

? ? ? ? 2:Combiner父类是Reducer

? ? ? ? 3:Combiner和Reducer区别在于运行的位置:Combiner只负责一个MapTask,Reducer负责全局所有的MapTask

? ? ? ? 4:Combiner意思是对每一个MapTask的输出进行局部汇总,减少传输量。

? ? ? ? 5:Combiner可以用的前提是不能影响最终的业务逻辑(算平均值不可,求和可以),且Combiner的输出kv要和Reducer的输入kv对应起来。

Mapper                                Reducer
3    5    7    ->    (3+5+7)/3=5      (3+5+7+2+6)/5=23/5  不等于  (5+4)/2=9/2  
2    6         ->    (2+6)/2=4

               ->    3+5+7=15         (15+8)=23  等于  (3+5+7+2+6)=23
               ->    2+6=8 

8:Combiner案例?(wordcount案例基础之上)

# 输入数据                    希望在mapper阶段即可得到这个
banzhang    ni    hao        <banzhang,4>
xihuan    hadoop             <ni,2>   
banzhang                     <hao,2>      
banzhang    ni    hao        <xihuan,2>    
xihuan    hadoop             <hadoop,2>   
banzhang

# 期望结果,在日志文件中可以看到combine input records = 12  combine  output  records = 5
# 方案一:新建WordCountCombiner类
public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context 
context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        //封装 outKV
        outV.set(sum);
        //写出 outKV
        context.write(key,outV);
    }
}

// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordCountCombiner.class);
# 如果设置 job.setNumReduceTasks(0),即取消Reduce阶段,那么shuffle整个都不会执行,因为
# shuffle是连接map和reduce的,所以combiner也不会执行。 

# 方案二:因为上面类的代码和Reducer代码一样
// 指定需要使用 Combiner,以及用哪个类作为 Combiner 的逻辑
job.setCombinerClass(WordCountReducer.class);

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:11:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/26 22:44:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码