文章已收录到我的Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary
写在前面
上一篇文章通过写一个WordCount学习了MapReduce的入门操作,那么这篇文章继续通过多一些例子来学习MapReduce。下面介绍几种比较常见的操作:排序,去重,求和,求平均数,TopK查询(查询排名前K名的记录)
排序
其实MapReduce会默认对Key进行升序自然排序,这显然是远远不够用的,下面我举个例子,输入的file1内容如下:
1,256
1,12
3,283
4,478
2,1001
2,3600
1,4
5,78
2,33
file2内容如下:
5,10
3,598
4,654
1,741
2,123
3,850
2,11568
1,12574
我们要的结果是根据第一个数字进行排序,如果第一个数字相同,则根据第二个数字排序,怎么玩呢?
首先我们得创建一个自定义的类,里面包括两个字段表示一行里面的第一个值和第二个值,接着实现序列化,反序列化方法,最重要是比较方法。
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ComparableKey implements WritableComparable<ComparableKey> {
private long firstNum;
private long secondNum;
public ComparableKey() {
}
public ComparableKey(long firstNum, long secondNum) {
this.firstNum = firstNum;
this.secondNum = secondNum;
}
public long getFirstNum() {
return firstNum;
}
public void setFirstNum(long firstNum) {
this.firstNum = firstNum;
}
public long getSecondNum() {
return secondNum;
}
public void setSecondNum(long secondNum) {
this.secondNum = secondNum;
}
@Override
public int compareTo(ComparableKey otherComparableKey) {
if (firstNum == otherComparableKey.getFirstNum()) {
return (int) (secondNum - otherComparableKey.getSecondNum());
} else {
return (int) (firstNum - otherComparableKey.getFirstNum());
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(firstNum);
dataOutput.writeLong(secondNum);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
firstNum = dataInput.readLong();
secondNum = dataInput.readLong();
}
}
接着写Mapper,输入类型是Text,转换为自定义的ComparableKey类型,会自动调compareTo()方法进行比较排序。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class NumberSortMapper extends Mapper<LongWritable, Text, ComparableKey, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split(",");
long firstNum = Long.parseLong(strings[0]);
long secondNum = Long.parseLong(strings[1]);
ComparableKey comparableKey = new ComparableKey(firstNum, secondNum);
context.write(comparableKey, NullWritable.get());
}
}
Mapper已经做了排序,那么Reduce层就只需要取出来就行了。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class NumberSortReduce extends Reducer<ComparableKey, NullWritable, LongWritable, LongWritable> {
@Override
protected void reduce(ComparableKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(key.getFirstNum()), new LongWritable(key.getSecondNum()));
}
}
最后再写个Main方法,作为入口:
public class NumberSort {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(NumberSort.class);
job.setMapperClass(NumberSortMapper.class);
job.setReducerClass(NumberSortReduce.class);
job.setMapOutputKeyClass(ComparableKey.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
接着把实验的文件上传上去hadoop的number_sort文件夹(自己创建的目录)。然后再执行任务,使用命令:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar NumberSort number_sort number_sort_output
执行成功后,效果如下:
1 4
1 12
1 256
1 741
1 12574
2 33
2 123
2 1001
2 3600
2 11568
3 283
3 598
3 850
4 478
4 654
5 10
5 78
去重
比如以下的这个文本,单词去重,怎么做呢?
hadoop is good
hadoop is so good
java is great
java and hadoop is very good
其实很简单,因为MapReduce输出的类型就是Map,Map的特性就是Key不能重复,于是乎我们可以把值想要去重的值放入Key,Value设置为NULL就完事了。Mapper步骤如下:
public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text keyOut = new Text();
String[] strings = value.toString().split(" ");
for (String str : strings) {
keyOut.set(str);
context.write(keyOut,NullWritable.get());
}
}
}
Reduce步骤不需要做其他操作,直接取值即可。
public class DistinctReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
再加个入口Main方法。
public class DistinctMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(DistinctMain.class);
job.setMapperClass(DistinctMapper.class);
job.setReducerClass(DistinctReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
把测试数据上传到hadoop上面。
然后执行命令如下:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar DistinctMain distinct distinct_output
输出结果如下:
and
good
great
hadoop
is
java
so
very
去重完成。
求和
比如有一道很经典的数学题,对1到100进行求和,如果用笔算很简单,可以用首尾相加法,1加99,2加98…以此类推。但是用MapReduce怎么做呢?
1
2
3
4
...
98
99
100
我们需要使用cleanup()方法,这个方法是在map方法执行完之后执行,只执行一次,看源码就明白了。
protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
}
public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
this.setup(context);
try {
while(context.nextKeyValue()) {
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
this.cleanup(context);
}
}
那么问题就很简单了,Mapper实现代码如下:
public class SumMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
private long sum = 0L;
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
long val = Long.parseLong(value.toString());
sum += val;
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(sum), NullWritable.get());
}
}
Reduce实现代码如下:
public class SumReduce extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> {
private long sum = 0L;
@Override
protected void reduce(LongWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
sum += key.get();
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(sum), NullWritable.get());
}
}
Main方法入口:
public class SumMain {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(SumMain.class);
job.setMapperClass(SumMapper.class);
job.setReducerClass(SumReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
打包成jar包,上传到服务器,然后把包含1到100文本上传到HDFS,执行命令跑任务:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar SumMain sum_main.txt sum_main_out
输出结果如下:
5050
求平均数
求平均数也是很常见的操作,比如有一大堆随机生成的数字,求出平均数:
10
25
22
78
119
88
56
32
29
25
求平均数的思路其实就是总和除以个数,所以Mapper阶段的输出就是<key,1> ,代码如下:
public class AverageMapper extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(new LongWritable(Long.parseLong(value.toString())), new IntWritable(1));
}
}
第二步Reduce步骤就利用cleanup()计算平均数,计算前先计数,求和,代码如下:
public class AverageReduce extends Reducer<LongWritable, IntWritable, Text, NullWritable> {
private long sum = 0L;
private long count = 0L;
@Override
protected void reduce(LongWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int i = 0;
for (IntWritable value : values) {
i += value.get();
}
sum += (key.get() * i);
count += i;
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
BigDecimal sumBigDecimal = new BigDecimal(sum);
BigDecimal countBigDecimal = new BigDecimal(count);
BigDecimal result = sumBigDecimal.divide(countBigDecimal, 2, RoundingMode.HALF_UP);
context.write(new Text(result.toString()), NullWritable.get());
}
}
入口Main类如下:
public class AverageMain {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(AverageMain.class);
job.setMapperClass(AverageMapper.class);
job.setReducerClass(AverageReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
接着还是老套路,打包上传jar包和测试用的文件,接着执行命令跑任务:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar AverageMain average_main.txt average_main_out
输出结果如下:
48.40
TopK查询
假设下面的文本,是单词以及单词出现的次数,要找出出现次数TOP5的单词,怎么做呢?
c++ 12
redis 45
java 120
Python 50
JavaScript 41
GoLang 30
Spring 30
Mybatis 11
Hibernate 6
RabbitMQ 64
Kafka 78
Nacos 46
SpringCloud 32
MySQL 100
UML 12
Seata 22
ZooKeeper 38
这里我们可以借用TreeMap 这个集合的特性,put进treeMap之后会默认从小到大自然排序,然后还提供倒序排序的方法descendingMap() 。
我写段代码示例一下吧:
public static void main(String[] args) {
TreeMap<Integer, String> treeMap = new TreeMap<>();
Random random = new Random();
for (int i = 0; i < 100; i++) {
int num = random.nextInt(100);
treeMap.put(num, num + "");
}
for (Integer num : treeMap.keySet()) {
System.out.println(num);
}
}
0
2
3
6
8
10
11
12
14
15
...
于是乎我们可以开始写代码,先写Mapper类,比较简单,就是按空格分割一下,然后输出到Reduce。
public class TopMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(" ");
String word = split[0];
long count = Long.parseLong(split[1]);
context.write(new LongWritable(count), new Text(word));
}
}
输出到Reduce之后,Reduce这边就需要收集,然后做一些处理,代码如下:
public class TopReduce extends Reducer<LongWritable, Text, Text, NullWritable> {
private TreeMap<Long, String> treeMap = new TreeMap<>();
private static final long TOP_K = 5;
@Override
protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
for (Text value : values) {
sb.append(value.toString()).append("、");
}
sb.deleteCharAt(sb.lastIndexOf("、"));
treeMap.put(key.get(), sb.toString());
if (treeMap.size() > TOP_K) {
treeMap.remove(treeMap.firstKey());
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
Map<Long, String> navigableMap = treeMap.descendingMap();
int i = 1;
String s;
for (Map.Entry<Long, String> entry : navigableMap.entrySet()) {
s = "排名第" + i + "位 " + entry.getValue() + "出现次数" + entry.getKey() + "次";
context.write(new Text(s), NullWritable.get());
i++;
}
}
}
最后再整个入口类Main。
public class TopMain {
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(TopMain.class);
job.setMapperClass(TopMapper.class);
job.setReducerClass(TopReduce.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
大功告成,然后打包上服务器,并且把测试用的文件也上传到服务器,接着执行命令跑任务:
hadoop jar /usr/local/hadoop-3.2.2/jar/hadooptest-1.0-SNAPSHOT.jar TopMain top_k_main.txt top_k_main_out
输出结果如下:
排名第1位 java出现次数120次
排名第2位 MySQL出现次数100次
排名第3位 Kafka出现次数78次
排名第4位 RabbitMQ出现次数64次
排名第5位 Python出现次数50次
总结
这篇文章主要介绍了排序,去重,求和,求平均数,TopK查询的小例子,可以加深一下对MapReduce的理解,这篇文章就讲到这里了,希望对大家有所帮助。
觉得有用就点个赞吧,你的点赞是我创作的最大动力~
我是一个努力让大家记住的程序员。我们下期再见!!!
能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!
|