IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Mapreduce排序器对象获取规则源码分析 -> 正文阅读

[大数据]Mapreduce排序器对象获取规则源码分析

写在前面

本篇博客背景

? 在使用Mapreduce程序时,不难看出Map阶段的输出键值对其实是默认按照字典排序的,而我们在使用该程序的时候也会因为需要满足某种需求自定义排序规则。当前我们有两种实现自定义排序的方式:


两种实现自定义排序的方式

  1. 直接让参与排序的对象实现WritableComparable接口,并在接口中实现compareTo方法,当运行的时候hadoop会自动帮助我们生成WritableComparator对象。

    具体可见下例:

    phoneBean.java

    package com.yxp.complication.exercise.sorttest;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    /**
     * @author 尤小鹏
     * 切忌一味模仿!
     * 2021  11 14  17:50
     * description:
     */
    public class PhoneBean implements WritableComparable<PhoneBean> {
    
        public int upFlow;
        public int downFlow;
        public int sumFlow;
    
        public int getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
    
        public int getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
    
        public int getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow() {
            this.sumFlow=upFlow+downFlow;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
    
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
    
           this.upFlow = in.readInt();
           this.downFlow=in.readInt();
            this.sumFlow=in.readInt();
        }
    
        @Override
        public String toString() {
            return upFlow + "\t" + downFlow + "\t" + sumFlow ;
        }
    
        //降序
        @Override
        public int compareTo(PhoneBean o) {
            return -(this.sumFlow-o.sumFlow);
        }
    }
    
    
  2. 自定义一个比较器对象,继承WritableComparator类并重写compare方法,在该方法中定义比较规则,注意在自定义的比较器对象中通过调用父类的super方法将自定义的比较器对象和要参与比较的对象关联。最后在Driver类中设置该比较器类为作业的比较器类。

具体可见下例:

PhoneBeanComparator.java

package com.yxp.complication.exercise.sorttest;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * @author 尤小鹏
 * 切忌一味模仿!
 * 2021  11 15  8:56
 * description:
 */
public class PhoneBeanComparator extends WritableComparator {

    public PhoneBeanComparator() {
        super(PhoneBean.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        PhoneBean abean = (PhoneBean) a;
        abean.setSumFlow();

        PhoneBean bbean = (PhoneBean) b;
        bbean.setSumFlow();
        return -(abean.sumFlow - bbean.sumFlow);
    }
}


我的疑问

疑问便是第一种实现方式是如何得到排序器对象从而排序的,因为它根本没有实例化啊!虽然结论是Hadoop帮他构造的,但是只知道这个未免有些粗浅了,因此我们要找到源头!

探索底层规则,记得找源码,看源码,理解源码。


思路展开

我们都知道使用job对象可以设置提交作业的各种参数,如:Mapper类、Reducer类、Map端的输出键值对类型,程序的输出键值对类型……由此自然而然的联想到程序的比较器是不是也可以在这里设置呢?

寻找过程

getSortComparator()–>

我尝试着使用job.comparator然后聪明的IEDA就为我找到了下面的这个方法。

image-20211115111247356

ctrl+鼠标左键进入该方法,方法体如下:

 /**
   * Get the {@link RawComparator} comparator used to compare keys.
   * 
   * @return the {@link RawComparator} comparator used to compare keys.
   */
  public RawComparator<?> getSortComparator() {
    return conf.getOutputKeyComparator();
  }

第二行描述足够明显,于是点进7行中的getOutputKeyComparator()方法——方法体如下:

/**
   * Get the {@link RawComparator} comparator used to compare keys.
   * 
   * @return the {@link RawComparator} comparator used to compare keys.
   */
  public RawComparator getOutputKeyComparator() {
    Class<? extends RawComparator> theClass = getClass(
      JobContext.KEY_COMPARATOR, null, RawComparator.class);
    if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

在这里逻辑已经相对具体了,我们把聚焦在getClass()的三个参数上。

第一个:JobContext.KEY_COMPARATOR,关于这个变量的定义是这样的:

public static final String KEY_COMPARATOR = "mapreduce.job.output.key.comparator.class";

第二个:null,作为第一个变量的默认值。

第三个:RawComparator.class,直接对对象的字节表示进行操作的Comparator 。

它出现在这里是什么意思呢?或者说:

getClass()的三个参数代表什么意义呢?还是要走进去看一看。

   * @param name the class name. (类名)
   * @param defaultValue default value.(默认值)
   * @param xface the interface implemented by the named class.(由类名实现的接口)
   * @return property value as a <code>Class</code>, 
   *         or <code>defaultValue</code>.
   */
  public <U> Class<? extends U> getClass(String name, 
                                         Class<? extends U> defaultValue, 
                                         Class<U> xface) 

所以我们可以理解了,getClass()的功能是获取比较器的类,如果用户设置了就使用设置的类,没有设置的话值为null。

那么下面的代码看起来也相对好理解一些了。

1-2行:如果不为null(设置了比较器的类),就使用反射的工具类,将这个类实例化。

3行:返回WritableComparator的get()方法的返回值。这是建立在上面的判断不成立的情况,也就是说如果我们没有设置比较器类就会调用该语句,我仿佛接近真相了。

该方法的两个参数:

  1. getMapOutputKeyClass().asSubclass(WritableComparable.class)作用是

    获取Map的输出key的类,将其作为WritableComparable类的子类

  2. this表示是该作业的conf对象。

 if (theClass != null)
      return ReflectionUtils.newInstance(theClass, this);
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);

走进get方法体——>

 /** Get a comparator for a {@link WritableComparable} implementation. */
  public static WritableComparator get(
      Class<? extends WritableComparable> c, Configuration conf) {
      
      //从比较器集合(comparators)中获取参数对应的比较器comparator,但是当前比较的类型得是Hadoop自带的数据类型,否则是拿不到的。
      
    WritableComparator comparator = comparators.get(c);
      
      //两个判断是否为null,第一个判断式为了防止一些极端情况可能会发生GCC回收机制导致比较器被回收
      
    if (comparator == null) {
        
        //如果被回收了,强制初始化
        
      forceInit(c);
      
        //再获取一次构造器
        
      comparator = comparators.get(c);
      
        //如果还是没有,说明排序的对象不是hadoop自带的数据类型,比如我们自己建的对象。
        
      if (comparator == null) {
      
          //为它创造一个WritableComparator对象实例,使第二种实现方式有了等同第一种实现方式的效果。
          
        comparator = new WritableComparator(c, conf, true);
      
      }
        
    }
      //为该构造器对象传入方法里传入的conf对象。
      
    ReflectionUtils.setConf(comparator, conf);
    return comparator;
  }

总结

Mapreduce程序在对Key排序时,

如果已经给Job对象设置好了比较器的类,会为它创建实例并按照其中的排序规则对Key进行排序,此为第二种自定义排序实现方式的逻辑。

否则,会从Hadoop自带的数据类型对应的比较器集合中寻找当前需要比较对象的比较器,

?	如果没有找到,为了防止极端情况发生导致原本该存在的比较器被回收,会进行一次强制初始化,然后再从集合中寻找它。

?		这一次如果再找不到,会判断该对象不是Hadoop自带的数据类型,如果需要比较的类实现了WritableComparable接口,则为它创造一个WritableComparator对象实例,使第二种实现方式有了等同第一种实现方式的效果。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-16 18:54:27  更:2021-11-16 18:54:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 5:28:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码