一、实验目的
- 通过实验掌握基本的MapReduce编程方法;
- 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
二、实验平台
- 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)
- Hadoop版本:3.1.3
三、实验内容
编程实现文件合并和去重操作
对于两个输入文件,即文件A和文件B,请编写MapReduce程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C。下面是输入文件和输出文件的一个样例供参考。
输入文件A的样例如下:
20150101 x
20150102 y
20150103 x
20150104 y
20150105 z
20150106 x
输入文件B的样例如下:
20150101 y
20150102 y
20150103 x
20150104 z
20150105 y
根据输入文件A和B合并得到的输出文件C的样例如下:
20150101 x
20150101 y
20150102 y
20150103 x
20150104 y
20150104 z
20150105 y
20150105 z
20150106 x
四、实验步骤
进入 Hadoop 安装目录,启动 hadoop:
cd /usr/local/hadoop
sbin/start-dfs.sh
新建文件夹,创建文件 A、B:
sudo mkdir MapReduce && cd MapReduce
sudo vim A
sudo vim B
编写 Java 文件实现 MapReduce:
sudo vim Merge.java
实现的 Java 代码如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Merge {
public static class Map extends Mapper<Object, Text, Text, Text>{
private static Text text = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
text = value;
context.write(text, new Text(""));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context ) throws IOException,InterruptedException{
context.write(key, new Text(""));
}
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://localhost:9000");
String[] otherArgs = new String[]{"input","output"};
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in><out>");
System.exit(2);
}
Job job = Job.getInstance(conf,"Merge and duplicate removal");
job.setJarByClass(Merge.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
赋予用户相关权限:
sudo chown -R hadoop /usr/local/hadoop
添加编译所需要使用的 jar 包:
vim ~/.bashrc
添加下面一行到文件的最后:
export HADOOP_HOME=/usr/local/hadoop
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH
使更改立即生效:
source ~/.bashrc
编译 Merge.java:
javac Merge.java
打包生成的 class 文件为 jar 包:
jar -cvf Merge.jar *.class
创建 Hadoop 主目录为 /user/hadoop 并创建 input 文件夹:
/usr/local/hadoop/bin/hdfs dfs -mkdir -p /user/hadoop
/usr/local/hadoop/bin/hdfs dfs -mkdir input
上传 A、B 文件到 input 文件夹中:
/usr/local/hadoop/bin/hdfs dfs -put ./A input
/usr/local/hadoop/bin/hdfs dfs -put ./B input
使用我们刚生成的 Merge.jar 包:
/usr/local/hadoop/bin/hadoop jar Merge.jar Merge
查看输出结果:
/usr/local/hadoop/bin/hdfs dfs -cat output/*
输出如下:
hadoop@fzqs-Laptop:/usr/local/hadoop$ hdfs dfs -cat output/*
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 x
hadoop@fzqs-Laptop:/usr/local/hadoop$
此外,有想用 Python 写的可以参考我这篇博客:实验5 MapReduce初级编程实践(Python实现)
|