IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MR自定义分组获取TopN -> 正文阅读

[大数据]MR自定义分组获取TopN

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 继承WritableComparator类,重写compare 方法   相同的订单ID认为相同
 */
public class MyGroupCompactor extends WritableComparator {

    /**
     * K1会根据这个规则判断是否需要合并
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        return first.getOrder_id().compareTo(second.getOrder_id());
    }

    /**
     * 设置分组类
     */
    public MyGroupCompactor() {
        super(OrderBean.class,true);
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 自定义我们的订单类   实现序列化和排序方法
 */
public class OrderBean implements WritableComparable<OrderBean> {
    private String order_id;
    private Double price;

    /**
     * 定义排序方法,如果订单相同,用价格排序,不同没有排序的必要
     */
    @Override
    public int compareTo(OrderBean orderBean) {
        //如果订单号相同比较价格,否则比较无意义
        if (this.order_id.compareTo(orderBean.getOrder_id())==0) {
            return this.price.compareTo(orderBean.getPrice());
        }
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(order_id);
        dataOutput.writeDouble(price);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.order_id = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }

    public String getOrder_id() {
        return order_id;
    }

    public void setOrder_id(String order_id) {
        this.order_id = order_id;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    /**
     * 重写toString方便reduceTast输出
     */
    @Override
    public String toString() {
        return this.order_id + "\t" + this.price;
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * MAP端获取数据,并以订单类的形式存储,输出orderBean类,和订单价格
 * 当输出时,K1合并会以MyGroupCompactor类的比较规则进行合并成K2
 * V1会以类内的排序方法,组合进入集合V2
 */
public class MyGroupMap extends Mapper<LongWritable, Text, OrderBean, DoubleWritable> {
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, DoubleWritable>.Context context) throws IOException, InterruptedException {
        String[] splits = value.toString().split("\t");
        OrderBean orderBean = new OrderBean();
        orderBean.setOrder_id(splits[0]);
        orderBean.setPrice(Double.parseDouble(splits[2]));
        context.write(orderBean,new DoubleWritable(Double.parseDouble(splits[2])));
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reduce端如果  相同K1合并,K2以数组连接,此时数组内为相同订单的价格
 */
public class MyGroupReduce extends Reducer<OrderBean, DoubleWritable,OrderBean,DoubleWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<DoubleWritable> values, Reducer<OrderBean, DoubleWritable, OrderBean, DoubleWritable>.Context context) throws IOException, InterruptedException {

        /**
         * 获取TopN只需要写循环几次即可
         */
        int i = 0;
        for (DoubleWritable value: values) {
            i++;
            if(i<=2){
                context.write(key,value);
            }else {
                break;
            }
        }


    }


}

package com.cn.demo_groupTopN;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 自定义分区:如果订单号相同分到同一个reduce里面
 */
public class MyPartion extends Partitioner<OrderBean, DoubleWritable> {
    @Override
    public int getPartition(OrderBean orderBean, DoubleWritable doubleWritable, int i) {
        /**
         * 此处的与&是位运算,返回一个二进制数,再除以分区数取余,得到应该去的分区位置
         */
        return (orderBean.getOrder_id().hashCode() & Integer.MAX_VALUE)%i;
    }
}

package com.cn.demo_groupTopN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyGroupMain extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(super.getConf(),"group_demo");
        job.setJarByClass(MyGroupMain.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\5、大数据离线第五天\\5、大数据离线第五天\\自定义groupingComparator\\input"));

        job.setMapperClass(MyGroupMap.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        //设置分区类
        job.setPartitionerClass(MyPartion.class);
        //设置分区数量
        job.setNumReduceTasks(2);

        //设置分组类
        job.setGroupingComparatorClass(MyGroupCompactor.class);

        //设置reduce类
        job.setReducerClass(MyGroupReduce.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(DoubleWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\5、大数据离线第五天\\5、大数据离线第五天\\自定义groupingComparator\\output_TOPN"));

        boolean b = job.waitForCompletion(true);
        return b?0:1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(),new MyGroupMain(),args);
        System.exit(run);
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-08 13:53:00  更:2021-12-08 13:54:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 5:54:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码