IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 几个关于MapReduce的小例子 -> 正文阅读

[大数据]几个关于MapReduce的小例子

文章已收录到我的Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary

写在前面

上一篇文章通过写一个WordCount学习了MapReduce的入门操作,那么这篇文章继续通过多一些例子来学习MapReduce。下面介绍几种比较常见的操作:排序,去重,求和,求平均数,TopK查询(查询排名前K名的记录)

排序

其实MapReduce会默认对Key进行升序自然排序,这显然是远远不够用的,下面我举个例子,输入的file1内容如下:

1,256
1,12
3,283
4,478
2,1001
2,3600
1,4
5,78
2,33

file2内容如下:

5,10
3,598
4,654
1,741
2,123
3,850
2,11568
1,12574

我们要的结果是根据第一个数字进行排序,如果第一个数字相同,则根据第二个数字排序,怎么玩呢?

首先我们得创建一个自定义的类,里面包括两个字段表示一行里面的第一个值和第二个值,接着实现序列化,反序列化方法,最重要是比较方法。

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name ComparableKey
 * @date 2022-01-13 23:58
 **/
public class ComparableKey implements WritableComparable<ComparableKey> {
	//一行内容的第一个值
    private long firstNum;
	//第二个值
    private long secondNum;

    public ComparableKey() {
    }

    public ComparableKey(long firstNum, long secondNum) {
        this.firstNum = firstNum;
        this.secondNum = secondNum;
    }

    public long getFirstNum() {
        return firstNum;
    }

    public void setFirstNum(long firstNum) {
        this.firstNum = firstNum;
    }

    public long getSecondNum() {
        return secondNum;
    }

    public void setSecondNum(long secondNum) {
        this.secondNum = secondNum;
    }

    @Override
    public int compareTo(ComparableKey otherComparableKey) {
        //如果第一位数相等,则比较第二位数,从小到大排序
        if (firstNum == otherComparableKey.getFirstNum()) {
            //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数
            return (int) (secondNum - otherComparableKey.getSecondNum());
        } else {
            //返回大于0的数表示前面的大于后面的,小于0则表示前面的数小于后面的数
            return (int) (firstNum - otherComparableKey.getFirstNum());
        }
    }

    //序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(firstNum);
        dataOutput.writeLong(secondNum);
    }

    //反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        firstNum = dataInput.readLong();
        secondNum = dataInput.readLong();
    }
}

接着写Mapper,输入类型是Text,转换为自定义的ComparableKey类型,会自动调compareTo()方法进行比较排序。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name NumberSortMapper
 * @date 2022-01-11 23:56
 **/

/**
 * Mapper有四个泛型参数需要填写
 * 第一个参数KEYIN:默认情况下,是mr框架所读到的一行文本的起始偏移量,类型为LongWritable
 * 第二个参数VALUEIN:默认情况下,是mr框架所读的一行文本的内容,类型为Text
 * 第三个参数KEYOUT:是逻辑处理完成之后输出数据的key,使用自定义的类型ComparableKey
 * 第四个参数VALUEOUT:是逻辑处理完成之后输出数据的value,在此处是次数,类型为NullWritable
 */
public class NumberSortMapper extends Mapper<LongWritable, Text, ComparableKey, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strings = value.toString().split(",");
        long firstNum = Long.parseLong(strings[0]);
        long secondNum = Long.parseLong(strings[1]);
        ComparableKey comparableKey = new ComparableKey(firstNum, secondNum);
        context.write(comparableKey, NullWritable.get());
    }
}

Mapper已经做了排序,那么Reduce层就只需要取出来就行了。

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name NumberSortReduce
 * @date 2022-01-14 00:18
 **/
public class NumberSortReduce extends Reducer<ComparableKey, NullWritable, LongWritable, LongWritable> {

    @Override
    protected void reduce(ComparableKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(key.getFirstNum()), new LongWritable(key.getSecondNum()));
    }
}

最后再写个Main方法,作为入口:

public class NumberSort {

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(NumberSort.class);
        job.setMapperClass(NumberSortMapper.class);
        job.setReducerClass(NumberSortReduce.class);
        job.setMapOutputKeyClass(ComparableKey.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

接着把实验的文件上传上去hadoop的number_sort文件夹(自己创建的目录)。然后再执行任务,使用命令:

hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar NumberSort number_sort number_sort_output

执行成功后,效果如下:

1	4
1	12
1	256
1	741
1	12574
2	33
2	123
2	1001
2	3600
2	11568
3	283
3	598
3	850
4	478
4	654
5	10
5	78

去重

比如以下的这个文本,单词去重,怎么做呢?

hadoop is good
hadoop is so good
java is great
java and hadoop is very good

其实很简单,因为MapReduce输出的类型就是Map,Map的特性就是Key不能重复,于是乎我们可以把值想要去重的值放入Key,Value设置为NULL就完事了。Mapper步骤如下:

public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text keyOut = new Text();
        String[] strings = value.toString().split(" ");
        for (String str : strings) {
            keyOut.set(str);
            context.write(keyOut,NullWritable.get());
        }
    }
}

Reduce步骤不需要做其他操作,直接取值即可。

public class DistinctReduce extends Reducer<Text, NullWritable, Text, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

再加个入口Main方法。

public class DistinctMain {
    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(DistinctMain.class);
        job.setMapperClass(DistinctMapper.class);
        job.setReducerClass(DistinctReduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

把测试数据上传到hadoop上面。

然后执行命令如下:

hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar DistinctMain distinct distinct_output

输出结果如下:

and
good
great
hadoop
is
java
so
very

去重完成。

求和

比如有一道很经典的数学题,对1到100进行求和,如果用笔算很简单,可以用首尾相加法,1加99,2加98…以此类推。但是用MapReduce怎么做呢?

1
2
3
4
...
98
99
100

我们需要使用cleanup()方法,这个方法是在map方法执行完之后执行,只执行一次,看源码就明白了。

//一般是啥事都不干,子类可以实现该方法做一些自己的事情
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    this.setup(context);
    try {
        while(context.nextKeyValue()) {
            this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    } finally {
        //执行完map方法后,执行cleanup()方法
        this.cleanup(context);
    }
}

那么问题就很简单了,Mapper实现代码如下:

public class SumMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    private long sum = 0L;
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        long val = Long.parseLong(value.toString());
        sum += val;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(sum), NullWritable.get());
    }
}

Reduce实现代码如下:

public class SumReduce extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {

    private long sum = 0L;

    @Override
    protected void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        sum += key.get();
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(sum), NullWritable.get());
    }
}

Main方法入口:

public class SumMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(SumMain.class);
        job.setMapperClass(SumMapper.class);
        job.setReducerClass(SumReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

打包成jar包,上传到服务器,然后把包含1到100文本上传到HDFS,执行命令跑任务:

hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar SumMain sum_main.txt sum_main_out

输出结果如下:

5050

求平均数

求平均数也是很常见的操作,比如有一大堆随机生成的数字,求出平均数:

10
25
22
78
119
88
56
32
29
25

求平均数的思路其实就是总和除以个数,所以Mapper阶段的输出就是<key,1>,代码如下:

public class AverageMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new LongWritable(Long.parseLong(value.toString())), new IntWritable(1));
    }
}

第二步Reduce步骤就利用cleanup()计算平均数,计算前先计数,求和,代码如下:

public class AverageReduce extends Reducer<LongWritable, IntWritable, Text, NullWritable> {

    private long sum = 0L;

    private long count = 0L;

    @Override
    protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int i = 0;
        for (IntWritable value : values) {
            i += value.get();
        }
        sum += (key.get() * i);
        count += i;
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        BigDecimal sumBigDecimal = new BigDecimal(sum);
        BigDecimal countBigDecimal = new BigDecimal(count);
        BigDecimal result = sumBigDecimal.divide(countBigDecimal, 2, RoundingMode.HALF_UP);
        context.write(new Text(result.toString()), NullWritable.get());
    }
}

入口Main类如下:

public class AverageMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(AverageMain.class);
        job.setMapperClass(AverageMapper.class);
        job.setReducerClass(AverageReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

接着还是老套路,打包上传jar包和测试用的文件,接着执行命令跑任务:

hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar AverageMain average_main.txt average_main_out

输出结果如下:

48.40

TopK查询

假设下面的文本,是单词以及单词出现的次数,要找出出现次数TOP5的单词,怎么做呢?

c++ 12
redis 45
java 120
Python 50
JavaScript 41
GoLang 30
Spring 30
Mybatis 11
Hibernate 6
RabbitMQ 64
Kafka 78
Nacos 46
SpringCloud 32
MySQL 100
UML 12
Seata 22
ZooKeeper 38

这里我们可以借用TreeMap这个集合的特性,put进treeMap之后会默认从小到大自然排序,然后还提供倒序排序的方法descendingMap()

我写段代码示例一下吧:

public static void main(String[] args) {
    TreeMap<Integer, String> treeMap = new TreeMap<>();
    Random random = new Random();
    for (int i = 0; i < 100; i++) {
        //生成随机数
        int num = random.nextInt(100);
        //插入到treeMap
        treeMap.put(num, num + "");
    }
    for (Integer num : treeMap.keySet()) {
        System.out.println(num);
    }
}

//打印结果
0
2
3
6
8
10
11
12
14
15
...

于是乎我们可以开始写代码,先写Mapper类,比较简单,就是按空格分割一下,然后输出到Reduce。

public class TopMapper extends Mapper<LongWritable, Text, LongWritable, Text> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(" ");
        String word = split[0];
        long count = Long.parseLong(split[1]);
        context.write(new LongWritable(count), new Text(word));
    }

}

输出到Reduce之后,Reduce这边就需要收集,然后做一些处理,代码如下:

public class TopReduce extends Reducer<LongWritable, Text, Text, NullWritable> {

    private TreeMap<Long, String> treeMap = new TreeMap<>();

    private static final long TOP_K = 5;

    @Override
    protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        for (Text value : values) {
            sb.append(value.toString()).append("、");
        }
        //去掉最后一个顿号
        sb.deleteCharAt(sb.lastIndexOf("、"));
        treeMap.put(key.get(), sb.toString());
        //如果大于最大长度,则删掉第一个元素,因为第一个元素是最小的
        if (treeMap.size() > TOP_K) {
            treeMap.remove(treeMap.firstKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        //倒序
        Map<Long, String> navigableMap = treeMap.descendingMap();
        //排名
        int i = 1;
        String s;
        for (Map.Entry<Long, String> entry : navigableMap.entrySet()) {
            s = "排名第" + i + "位 " + entry.getValue() + "出现次数" + entry.getKey() + "次";
            context.write(new Text(s), NullWritable.get());
            i++;
        }
    }
    
}

最后再整个入口类Main。

public class TopMain {
    public static void main(String[] args) throws Exception{
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(TopMain.class);
        job.setMapperClass(TopMapper.class);
        job.setReducerClass(TopReduce.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        //指定job 的输入文件所在的目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        // 指定job 的输出结果所在的目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean completion = job.waitForCompletion(true);
        System.exit(completion ? 0 : 1);
    }
}

大功告成,然后打包上服务器,并且把测试用的文件也上传到服务器,接着执行命令跑任务:

hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar TopMain top_k_main.txt top_k_main_out

输出结果如下:

排名第1位 java出现次数120次
排名第2位 MySQL出现次数100次
排名第3位 Kafka出现次数78次
排名第4位 RabbitMQ出现次数64次
排名第5位 Python出现次数50次

总结

这篇文章主要介绍了排序,去重,求和,求平均数,TopK查询的小例子,可以加深一下对MapReduce的理解,这篇文章就讲到这里了,希望对大家有所帮助。

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-08 22:34:18  更:2022-03-08 22:37:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 19:54:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码