MapReduce操作
实验环境
dataset数据集NASA_log_sample.txt文本内容:
199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / HTTP/1.0" 200 7074
unicomp6.unicomp.net - - [01/Jul/1995:00:00:14 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
unicomp6.unicomp.net - - [01/Jul/1995:00:00:14 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
unicomp6.unicomp.net - - [01/Jul/1995:00:00:14 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
d104.aa.net - - [01/Jul/1995:00:00:15 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
d104.aa.net - - [01/Jul/1995:00:00:15 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
d104.aa.net - - [01/Jul/1995:00:00:15 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
129.94.144.152 - - [01/Jul/1995:00:00:17 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:17 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1713
ppptky391.asahi-net.or.jp - - [01/Jul/1995:00:00:18 -0400] "GET /facts/about_ksc.html HTTP/1.0" 200 3977
net-1-141.eden.com - - [01/Jul/1995:00:00:19 -0400] "GET /shuttle/missions/sts-71/images/KSC-95EC-0916.jpg HTTP/1.0" 200 34029
ppptky391.asahi-net.or.jp - - [01/Jul/1995:00:00:19 -0400] "GET /images/launchpalms-small.gif HTTP/1.0" 200 11473
205.189.154.54 - - [01/Jul/1995:00:00:24 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
waters-gw.starway.net.au - - [01/Jul/1995:00:00:25 -0400] "GET /shuttle/missions/51-l/mission-51-l.html HTTP/1.0" 200 6723
ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:27 -0400] "GET / HTTP/1.0" 200 7074
205.189.154.54 - - [01/Jul/1995:00:00:29 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
alyssa.prodigy.com - - [01/Jul/1995:00:00:33 -0400] "GET /shuttle/missions/sts-71/sts-71-patch-small.gif HTTP/1.0" 200 12054
ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:35 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 200 5866
dial22.lloyd.com - - [01/Jul/1995:00:00:37 -0400] "GET /shuttle/missions/sts-71/images/KSC-95EC-0613.jpg HTTP/1.0" 200 61716
smyth-pc.moorecap.com - - [01/Jul/1995:00:00:38 -0400] "GET /history/apollo/apollo-13/images/70HC314.GIF HTTP/1.0" 200 101267
205.189.154.54 - - [01/Jul/1995:00:00:40 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
ix-orl2-01.ix.netcom.com - - [01/Jul/1995:00:00:41 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:41 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:41 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 200 363
205.189.154.54 - - [01/Jul/1995:00:00:41 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:41 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 200 234
ppp-mia-30.shadow.net - - [01/Jul/1995:00:00:43 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 200 669
ix-orl2-01.ix.netcom.com - - [01/Jul/1995:00:00:44 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
gayle-gaston.tenet.edu - - [01/Jul/1995:00:00:50 -0400] "GET /shuttle/missions/sts-71/mission-sts-71.html HTTP/1.0" 200 12040
piweba3y.prodigy.com - - [01/Jul/1995:00:00:54 -0400] "GET /shuttle/missions/sts-71/sts-71-patch-small.gif HTTP/1.0" 200 12054
scheyer.clark.net - - [01/Jul/1995:00:00:58 -0400] "GET /shuttle/missions/sts-71/movies/sts-71-mir-dock-2.mpg HTTP/1.0" 200 49152
ppp-nyc-3-1.ios.com - - [01/Jul/1995:00:00:59 -0400] "GET /shuttle/missions/sts-71/images/KSC-95EC-0882.jpg HTTP/1.0" 200 77163
199.72.81.55 - - [01/Jul/1995:00:00:59 -0400] "GET /history/ HTTP/1.0" 200 1382
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:02 -0400] "GET /software/winvn/winvn.html HTTP/1.0" 200 9867
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:04 -0400] "GET /software/winvn/winvn.gif HTTP/1.0" 200 25218
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:04 -0400] "GET /images/construct.gif HTTP/1.0" 200 1414
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:04 -0400] "GET /software/winvn/bluemarb.gif HTTP/1.0" 200 4441
dd14-012.compuserve.com - - [01/Jul/1995:00:01:05 -0400] "GET /shuttle/technology/images/srb_16-small.gif HTTP/1.0" 200 42732
205.189.154.54 - - [01/Jul/1995:00:01:06 -0400] "GET /cgi-bin/imagemap/countdown?99,176 HTTP/1.0" 302 110
205.189.154.54 - - [01/Jul/1995:00:01:08 -0400] "GET /shuttle/missions/sts-71/images/images.html HTTP/1.0" 200 7634
www-a1.proxy.aol.com - - [01/Jul/1995:00:01:09 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
dd15-062.compuserve.com - - [01/Jul/1995:00:01:12 -0400] "GET /news/sci.space.shuttle/archive/sci-space-shuttle-22-apr-1995-40.txt HTTP/1.0" 404 -
205.212.115.106 - - [01/Jul/1995:00:01:13 -0400] "GET /shuttle/missions/sts-71/images/images.html HTTP/1.0" 200 7634
piweba3y.prodigy.com - - [01/Jul/1995:00:01:14 -0400] "GET /shuttle/technology/images/srb_mod_compare_3-small.gif HTTP/1.0" 200 55666
remote27.compusmart.ab.ca - - [01/Jul/1995:00:01:14 -0400] "GET /shuttle/missions/sts-71/sts-71-patch-small.gif HTTP/1.0" 200 12054
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:17 -0400] "GET /software/winvn/wvsmall.gif HTTP/1.0" 200 13372
ix-orl2-01.ix.netcom.com - - [01/Jul/1995:00:01:18 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
smyth-pc.moorecap.com - - [01/Jul/1995:00:01:19 -0400] "GET /history/apollo/images/footprint-small.gif HTTP/1.0" 200 18149
205.189.154.54 - - [01/Jul/1995:00:01:19 -0400] "GET /shuttle/missions/sts-71/images/KSC-95EC-0423.txt HTTP/1.0" 200 1224
www-b4.proxy.aol.com - - [01/Jul/1995:00:01:21 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 70712
smyth-pc.moorecap.com - - [01/Jul/1995:00:01:24 -0400] "GET /history/apollo/apollo-spacecraft.txt HTTP/1.0" 200 2261
slip1.yab.com - - [01/Jul/1995:00:01:26 -0400] "GET /shuttle/resources/orbiters/endeavour.html HTTP/1.0" 200 6168
link097.txdirect.net - - [01/Jul/1995:00:01:26 -0400] "GET /shuttle/missions/missions.html HTTP/1.0" 200 8677
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:27 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:27 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 200 363
remote27.compusmart.ab.ca - - [01/Jul/1995:00:01:27 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
link097.txdirect.net - - [01/Jul/1995:00:01:27 -0400] "GET /images/launchmedium.gif HTTP/1.0" 200 11853
slip1.yab.com - - [01/Jul/1995:00:01:29 -0400] "GET /shuttle/resources/orbiters/endeavour.gif HTTP/1.0" 200 16991
link097.txdirect.net - - [01/Jul/1995:00:01:31 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
link097.txdirect.net - - [01/Jul/1995:00:01:31 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:32 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 200 234
port26.annex2.nwlink.com - - [01/Jul/1995:00:01:32 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 200 669
onyx.southwind.net - - [01/Jul/1995:00:01:34 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
onyx.southwind.net - - [01/Jul/1995:00:01:35 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
onyx.southwind.net - - [01/Jul/1995:00:01:39 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 304 0
unicomp6.unicomp.net - - [01/Jul/1995:00:01:41 -0400] "GET /htbin/cdt_main.pl HTTP/1.0" 200 3214
199.72.81.55 - - [01/Jul/1995:00:01:43 -0400] "GET / HTTP/1.0" 200 7074
link097.txdirect.net - - [01/Jul/1995:00:01:44 -0400] "GET /shuttle/missions/sts-78/mission-sts-78.html HTTP/1.0" 200 4377
link097.txdirect.net - - [01/Jul/1995:00:01:45 -0400] "GET /shuttle/missions/sts-78/sts-78-patch-small.gif HTTP/1.0" 200 4179
199.72.81.55 - - [01/Jul/1995:00:01:46 -0400] "GET /images/ksclogo-medium.gif HTTP/1.0" 200 5866
gater4.sematech.org - - [01/Jul/1995:00:01:46 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
link097.txdirect.net - - [01/Jul/1995:00:01:47 -0400] "GET /images/launch-logo.gif HTTP/1.0" 200 1713
ppp-nyc-3-1.ios.com - - [01/Jul/1995:00:01:49 -0400] "GET /shuttle/missions/sts-71/images/KSC-95EC-0917.jpg HTTP/1.0" 200 52491
gater3.sematech.org - - [01/Jul/1995:00:01:50 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
199.72.81.55 - - [01/Jul/1995:00:01:50 -0400] "GET /images/MOSAIC-logosmall.gif HTTP/1.0" 200 363
199.72.81.55 - - [01/Jul/1995:00:01:51 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 200 234
gater4.sematech.org - - [01/Jul/1995:00:01:52 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
199.72.81.55 - - [01/Jul/1995:00:01:52 -0400] "GET /images/WORLD-logosmall.gif HTTP/1.0" 200 669
ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:01:52 -0400] "GET /software/winvn/userguide/wvnguide.html HTTP/1.0" 200 5998
gater3.sematech.org - - [01/Jul/1995:00:01:52 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
remote27.compusmart.ab.ca - - [01/Jul/1995:00:01:53 -0400] "GET /cgi-bin/imagemap/countdown?102,174 HTTP/1.0" 302 110
remote27.compusmart.ab.ca - - [01/Jul/1995:00:01:55 -0400] "GET /shuttle/missions/sts-71/images/images.html HTTP/1.0" 200 7634
link097.txdirect.net - - [01/Jul/1995:00:01:55 -0400] "GET /shuttle/resources/orbiters/columbia.html HTTP/1.0" 200 6922
dave.dev1.ihub.com - - [01/Jul/1995:00:01:55 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
link097.txdirect.net - - [01/Jul/1995:00:01:56 -0400] "GET /shuttle/resources/orbiters/columbia-logo.gif HTTP/1.0" 200 11417
netport-27.iu.net - - [01/Jul/1995:00:01:57 -0400] "GET / HTTP/1.0" 200 7074
ix-or10-06.ix.netcom.com - - [01/Jul/1995:00:01:57 -0400] "GET /software/winvn/userguide/wvnguide.gif HTTP/1.0" 200 4151
dave.dev1.ihub.com - - [01/Jul/1995:00:01:58 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 200 786
dave.dev1.ihub.com - - [01/Jul/1995:03:01:58 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204
dave.dev1.ihub.com - - [01/Jul/1995:02:01:58 -0400] "GET /shuttle/countdown/count.gif HTTP/1.0" 200 40310
pm13.j51.com - - [01/Jul/1995:01:01:58 -0400] "GET /shuttle/missions/sts-71/movies/crew-arrival-t38.mpg HTTP/1.0" 200 305722
一、WordCount单词计数
- 理解mapreduce执行原理
- 掌握mapreduce程序开发技术
- 熟悉mapreduce作业提交流程
1.实验内容
- 准备数据文件
- mapreduce程序编写
- 程序测试及运行
2.实验原理
注意:MapReducer的主要过程主要分为map阶段与Reduce阶段,
首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,
最后输出作为Reduce的输入,通过Reduce进行数据逻辑上的处理。
编写1个mapreduce程序进行wordcount统计,其中1个map类继承了Mapper类,1个reduce类继承了Reducer类,
还有1个主类用来提交程序对原始数据进行处理,把文档中所有的英文单词进行统计所有单词的个数。
首先对待处理的信息进行拆分,
拆分之后在map阶段,拆分后计算出单词个数并作为map方法的输出值,而map的方法输出键作为NullWritable即可,
最后在reduce阶段对每个键的值集合进行遍历并把遍历的值进行相加,输出结果即可。
3.实验步骤
(1)启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,执行如下命令,查看HDFS服务启动情况:
jps
(2)准备数据文件
- step1:编辑数据文件。在终端窗口中,执行如下命令,编辑数据文件
word.txt :
cd ~/data/dataset/
vim word.txt
在word.txt 文件中,输入如下内容,单词间以空格分隔:
good good study
day day up
保存并退出文件编辑。
- step2:将数据文件
word.txt 上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put /data/dataset/word.txt
(3)创建Map/Reduce项目
创建Map/Reduce项目,编辑MapReduce程序,统计”word.txt”文本中的单词出现频率。
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:编写com.simple.WordCountMapper类完成对单词的切分处理,并以(k,v)的形式输出到Reduce阶段:
package com.simple;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}
- step3:编写WordCountReducer类代码,实现对单词个数的统计,com.simple.WordCountReducer的Java类代码如下:
package com.simple;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
- step4:创建com.simple.WordCountDriver驱动程序类,提交和运行作业代码如下:
package com.simple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
final String hdfsurl = "hdfs://localhost:9000";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(hdfsurl + "/word.txt"));
FileOutputFormat.setOutputPath(job, new Path(hdfsurl + "/word-output"));
boolean flag = job.waitForCompletion(false);
System.exit(flag ? 0 : 1);
}
}
(4)程序测试及运行
在运行WordCountDriver类,如果一切正常则可以在HDFS上查看统计的结果文件。在终端窗口中执行如下命令:
hdfs dfs -cat /data/word-output/part-r-00000
可以看到单词计数的结果如下:
day 2
good 2
study 1
up 1
二、MapReduce数据去重
掌握去重的原理并使用MapReduce进行编程
1.实验内容
2.实验原理
- 目标:原始数据中出现次数超过1次的数据在输出文件中只出现1次。
- 算法思想:根据reduce的过程特性,会自动根据key来计算输入的value集合,把数据作为key输出给reduce,无论这个数据出现多少次,reduce最终结果中key只能输出1次。
- 实例中每个数据代表输入文件中的1行内容,map阶段采用Hadoop默认的作业输入方式。将value设置为key,并直接输出。 map输出数据的key为数据,将value设置成空值
- 在MapReduce流程中,map的输出
<key,value> 经过shuffle过程聚集成<key,value-list> 后会交给reduce - reduce阶段不管每个key有多少个value,它直接将输入的key复制为输出的key,并输出(输出中的value被设置成空)。
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,执行如下命令,查看HDFS服务启动情况:
jps
(2)准备数据文件
- step1:查看源数据文件内容。在终端窗口中,执行如下命令:
cat ~/data/dataset/Deduplicationinfo.txt
文件内容如下:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-7 d
2012-3-3 c
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/Deduplicationinfo.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.DeduplicationMapper的Java类,让其继承Mapper同时指定需要的参数类型,根据业务逻辑修改map类:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DeduplicationMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text line = value;
context.write(line, new Text(""));
}
}
- step3:新建1个类名为com.simple.DeduplicationReducer并继承Reducer类,然后添加该类中的代码内容如下所示。
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DeduplicationReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
context.write(key, new Text(""));
}
}
- step4:在项目src目录下右键点击,新建1个测试主类名为com.simple.TestDeduplication并指定main主方法,测试代码如下所示:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestDeduplication {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(TestDeduplication.class);
job.setMapperClass(DeduplicationMapper.class);
job.setReducerClass(DeduplicationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/Deduplicationinfo.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(4)程序测试及运行
运行TestDeduplication类,控制台打印如下图所示,且无错误日志产生,程序运行完毕。
程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
hdfs dfs -ls /data/simple/output/
hdfs dfs -cat /data/simple/output/part-r-00000
三、MapReduce数据排序
理解排序的原理并使用MapReduce编写程序
1.实验内容
2.实验原理
在MapReduce操作时,传递的<key,value> 会按照key的大小进行排序,最后输出的结果是按照key排过序的。
在key排序的基础上,对value也进行排序,这种需求就是2次排序。
2次排序是在框架在对key2排序后再对reduce输出结果的结果value3进行2次排序的需求。
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供1个RecordReder的实现。
本例子中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这1行的文本作为value。
核心总结:
- map最后阶段进行partition分区,1般使用job.setPartitionerClass设置的类,如果没有自定义Key的hashCode()方法进行排序。
- (第1次排序)每个分区内部调用job.setSortComparatorClass设置的key的比较函数类进行排序,如果没有则使用Key的实现的compareTo方法。
- (第2次排序)当reduce接收到所有map传输过来的数据之后,调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用Key的实现的compareTo方法。
- 紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同1个Key的value放在1个迭代器里面。
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,执行如下命令,查看HDFS服务启动情况:
jps
(2)准备数据文件
cat ~/dataset/SecondarySort.txt
可以看到,文件内容如下:
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/SecondarySort.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.IntPair的Java类,该类是对给定数据的两列值的封装,并作为mapper的输出键对象 。实现代码如下:
package com.simple02;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;
public IntPair() {
super();
}
public IntPair(int first, int second) {
super();
this.first = first;
this.second = second;
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
IntPair other = (IntPair) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
@Override
public String toString() {
return "IntPair [first=" + first + ", second=" + second + "]";
}
@Override
public int compareTo(IntPair intPair) {
if (first - intPair.getFirst() != 0) {
return first > intPair.first ? 1 : -1;
} else {
return second > intPair.second ? 1 : -1;
}
}
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readInt();
this.second = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}
}
- step3:创建com.simple.FirstPartitioner类对数据处理后的结果进行分区设置 。代码实现如下:
package com.simple02;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FirstPartitioner extends Partitioner<IntPair, Text> {
@Override
public int getPartition(IntPair key, Text value, int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
- step4:创建com.simple.GroupingComparator类,对处理的数据进行分组设置 。实现代码如下:
package com.simple02;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class GroupingComparator extends WritableComparator {
protected GroupingComparator() {
super(IntPair.class, true);
}
@SuppressWarnings("rawtypes")
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return ip1.compareTo(ip2);
}
}
- step4:创建com.simple.SecondarySortMapper类,继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容:
package com.simple02;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortMapper extends Mapper<LongWritable, Text, IntPair, Text> {
private final IntPair keyPair = new IntPair();
String[] lineArr = null;
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
lineArr = line.split(" ", -1);
keyPair.setFirst(Integer.parseInt(lineArr[0]));
keyPair.setSecond(Integer.parseInt(lineArr[1]));
context.write(keyPair, value);
}
}
- step5:创建com.simple.SecondarySortReducer类,继承Reducer类,然后添加该类中的代码内容如下所示:
package com.simple02;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortReducer extends Reducer<IntPair, Text, Text, Text> {
private static final Text SEPARATOR = new Text("---------------------");
public void reduce(IntPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
for (Text val : values) {
context.write(null, val);
}
}
}
- step6:创建com.simple.SecondarySortJob类,并指定main主方法。测试代码如下所示:
package com.simple02;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySortJob {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySortJob.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(GroupingComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("/SecondarySort.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(4)程序测试及运行
运行类文件,查看控制台显示内容查看是否正确执行。如下图所示:
程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
hdfs dfs -cat /data/simple/output2/*
四、MapReduce数据分区
掌握分区的原理以及使用mapreduce进行编程
1.实验内容
2.实验原理
Hadoop采用的派发方式默认是根据散列值来派发,当数据进行map转换后,根据map后数据的key值进行散列派发,
这样的弊端就是当数据key的值过于相似且集中时,大部分的数据就会分到同1个reducer中,从而造成数据倾斜,影响程序的运行效率。
所以需要我们定制partition根据自己的要求,选择记录的reducer。
自定义partitioner很简单,只要自定义1个类继承Partitioner类,重写getPartition方法就好了,
在使用的时候通过调用Job的setPartitionerClass指定1下即可。
Map的结果会通过partition分发到Reducer上。
如果设置了Combiner,Map的结果会先送到Combiner进行合并,再将合并后数据发送给Reducer。
Mapper最终处理的键值对<key,value> ,是需要送到Reducer去合并的,有相同key的键/值对会送到同1个Reducer。
哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有1个方法:
getPartition(Text key, Text value, int numPartitions)
系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。
这样就保证如果有相同的key值,肯定被分配到同1个reducre上。
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,执行如下命令,查看HDFS服务启动情况:
jps
(2)准备数据文件
- step1:查看源数据文件内容。在终端窗口中,执行如下命令:
cat /data/dataset/StuAgeCata.txt
可以看到,文件内容如下:
tom 13
jerry 28
lisa 34
marry 22
tonny 17
kaisa 18
bruce 29
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/StuAgeCata.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.StudentWritable的Java类,该类对给定数据的3列值封装,并作为mapper的输出键值对象 。
package com.simple03;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class StudentWritable implements Writable {
private String name;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public StudentWritable() {
}
public StudentWritable(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "StudentWritable [name=" + name + ", age=" + age + "]";
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.age = in.readInt();
}
}
- step3:创建com.simple.StuPartitioner的Java类,该类是对数据处理后的结果进行分区设置 。代码实现如下:
package com.simple03;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class StuPartitioner extends Partitioner<NullWritable, StudentWritable> {
@Override
public int getPartition(NullWritable key, StudentWritable value, int numPartitions) {
if (value.getAge() >= 18) {
return 1;
} else {
return 0;
}
}
}
- step4:创建com.simple.StudentMapper的Java类,继承类Mapper同时指定需要的参数类型-,根据业务逻辑修改map类:
package com.simple03;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StudentMapper extends Mapper<LongWritable, Text, NullWritable, StudentWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, StudentWritable>.Context context) throws IOException, InterruptedException {
String stuArr[] = value.toString().split(" ");
context.write(NullWritable.get(), new StudentWritable(stuArr[0], Integer.parseInt(stuArr[1])));
}
}
- step5:创建com.simple.StudentRedcer的Java类,并继承Reducer类,然后添加该类中的代码内容如下所示:
package com.simple03;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class StudentReducer extends Reducer<NullWritable, StudentWritable, NullWritable, Text> {
@Override
protected void reduce(NullWritable key, Iterable<StudentWritable> iter, Reducer<NullWritable, StudentWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
Iterator<StudentWritable> it = iter.iterator();
while (it.hasNext()) {
context.write(NullWritable.get(), new Text(it.next().toString()));
}
}
}
- step5:新建测试主类com.simple.TestStuMapReducer,并指定main主方法,编写测试代码如下:
package com.simple03;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestStuMapReducer {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(TestStuMapReducer.class);
job.setMapperClass(StudentMapper.class);
job.setReducerClass(StudentReducer.class);
job.setPartitionerClass(StuPartitioner.class);
job.setNumReduceTasks(2);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(StudentWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("/StuAgeCata.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
job.waitForCompletion(true);
}
}
(4)程序测试及运行
运行WordCountDriver类文件,查看控制台显示内容查看是否正确执行。如下图所示:
程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
hdfs dfs -ls /data/simple/output3/
hdfs dfs -cat /data/simple/output3/part-r-00000
hdfs dfs -cat /data/simple/output3/part-r-00001
五、MapReduce处理Mapper端多类型value值
掌握使用GenericWritable类来包装多个属于不同数据类型的value实例。
1.实验内容
Web日志分析,包括:
- 自定义LogWritable类
- 实现GenericWritable数据类型
- 实现Mapper类
- 实现Reducer类
- 实现Driver类
- 作业提交到集群上运行
2.实验原理
当执行Reducer端的join 时,为了避免在有多个MapReduce计算在汇总1个数据集中不同属性类型时的复杂性时,
从Mapper发射属于多种value类型的数据是很有用的。
但是Hadoop Reducer并不允许多个input value类型。
在这种场景下,我们可以使用GenericWritable类来包装多个属于不同数据类型的value实例。
我们使用HTTP服务器日志项分析。
在这个示例中,从Mapper输出多个不同的value类型。
程序会聚合来自web服务器的字节总数量到1个特定的host,并输出由该特定host请求的1个URL列表(用tab分隔)。
这里我们使用IntWritable 来输出来自Mapper的字节数量,使用Text 来输出请求的URL。
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,执行如下命令,查看HDFS服务启动情况:
jps
(2)准备数据文件★
- step1:查看源日志文件部分内容。在终端窗口中,执行如下命令:
cat ~/dataset/NASA_log_sample.txt | head -5
可以看到,文件内容如下:
199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/NASA_log_sample.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.LogWritable的Java类,它实现了Writable接口,表示1个日志信息。编辑源代码如下:
package com.simple04;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class LogWritable implements Writable {
private Text userIP;
private Text timestamp;
private Text url;
private IntWritable status;
private IntWritable responseSize;
public LogWritable() {
this.userIP = new Text();
this.timestamp = new Text();
this.url = new Text();
this.status = new IntWritable();
this.responseSize = new IntWritable();
}
public void set(String userIP, String timestamp, String url, int status, int responseSize) {
this.userIP.set(userIP);
this.timestamp.set(timestamp);
this.url.set(url);
this.status.set(status);
this.responseSize.set(responseSize);
}
public Text getUserIP() {
return userIP;
}
public void setUserIP(Text userIP) {
this.userIP = userIP;
}
public Text getTimestamp() {
return timestamp;
}
public void setTimestamp(Text timestamp) {
this.timestamp = timestamp;
}
public Text getUrl() {
return url;
}
public void setUrl(Text url) {
this.url = url;
}
public IntWritable getStatus() {
return status;
}
public void setStatus(IntWritable status) {
this.status = status;
}
public IntWritable getResponseSize() {
return responseSize;
}
public void setResponseSize(IntWritable responseSize) {
this.responseSize = responseSize;
}
@Override
public void write(DataOutput out) throws IOException {
userIP.write(out);
timestamp.write(out);
url.write(out);
status.write(out);
responseSize.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
userIP.readFields(in);
timestamp.readFields(in);
url.readFields(in);
status.readFields(in);
responseSize.readFields(in);
}
}
- step3:实现GenericWritable数据类型:创建com.simple.MultiValueWritable的Java类,它继承自GenericWritable类,可以包装多种不同类型的value,编辑源代码如下:
package com.simple04;
import org.apache.hadoop.io.GenericWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
public class MultiValueWritable extends GenericWritable {
private static Class[] CLASSES = {LogWritable.class, IntWritable.class};
public MultiValueWritable() {
}
public MultiValueWritable(Writable w) {
this.set(w);
}
@Override
protected Class[] getTypes() {
return CLASSES;
}
}
- step4:创建com.simple.LogMapper的Java类,它继承自Mapper类。编辑源代码如下:
package com.simple04;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogMapper extends Mapper<LongWritable, Text, Text, MultiValueWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String regexp = "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)";
Pattern pattern = Pattern.compile(regexp);
Matcher matcher = pattern.matcher(value.toString());
if (!matcher.matches()) {
System.out.println("不是1个有效的日志记录");
return;
}
String ip = matcher.group(1);
String timestamp = matcher.group(4);
String url = matcher.group(5);
int status = Integer.parseInt(matcher.group(6));
int responseSize = Integer.parseInt(matcher.group(7));
LogWritable log = new LogWritable();
log.set(ip, timestamp, url, status, responseSize);
context.write(new Text(ip), new MultiValueWritable(log));
context.write(new Text(ip), new MultiValueWritable(new IntWritable(responseSize)));
}
}
- step5:创建com.simple.LogReducer的Java类,它继承自Reducer类。编辑源代码如下:
package com.simple04;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
public class LogReducer extends Reducer<Text, MultiValueWritable, Text, Text> {
private Text text = new Text();
@Override
protected void reduce(Text key, Iterable<MultiValueWritable> values, Context context) throws IOException, InterruptedException {
int total = 0;
StringBuilder sb = new StringBuilder();
for (MultiValueWritable mvw : values) {
Writable w = mvw.get();
if (w instanceof IntWritable) {
total += ((IntWritable) w).get();
} else {
sb.append(((LogWritable) w).getUrl());
sb.append("\t");
}
}
context.write(key, new Text(sb.toString() + ":" + total));
}
}
- step6:创建com.simple.LogDriver的Java类,这是驱动程序类。编辑源代码如下:
package com.simple04;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception {
String input = "hdfs://localhost:9000/NASA_log_sample.txt";
String output = "hdfs://localhost:9000/log-output";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "日志分析");
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MultiValueWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(4)程序测试及运行
运行LogDriver类文件,程序运行后,在Eclipse控制台如果无错误日志产生,则程序运行完毕且正确。
虽然实际情况中出现非有效日志记录问题,但是结果仍然正常输出了,暂时不清楚原因(忽略掉部分错误)
程序执行完毕之后,查看输出结果。在终端窗口中,执行如下命令:
hdfs dfs -tail /data/log-output/part-r-00000
可以看到如下的计算结果:
六、MapReduce实现并使用自定义InputFormat
掌握为MapReduce计算实现并指定自定义的InputFormat实现,来获得对输入数据更多的控制。
1.实验内容
为HTTP日志文件实现1个自定义的InputFormat和RecordReader。
这个InputFormat将生成LongWritable类型的key,LogWritable类型的value。包括:
- 自定义LogWritable类
- 自定义LogRecordReader类
- 自实现LogFileInputFormat类
- 实现Mapper类
- 实现Reducer类
- 实现Driver类
- 作业提交到集群上运行
2.实验原理
自定义的InputFormat实现应该继承org.apache.hadoop.mapreduce.InputFormat<K,V> 抽象类,并重写createRecordReader()和getSplits()方法。
LogFileInputFormat继承自FileInputFormat, 而FileInputFormat为基于HDFS文件的InputFormat提供了1个通用的Splitting机制。
我们在LogFileInputFormat中重写createRecordReader()方法,
以提供1个我们自定义的RecordReader实现的实例-LogFileRecordReader。
LogFileRecordReader类继承自org.apache.hadoop.mapreduce.RecordReader<K,V>抽象类,
内部使用LineRecordReader来执行输入数据的基本解析。
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,执行如下命令,查看HDFS服务启动情况:
jps
(2)准备数据文件★
- step1:查看源日志文件部分内容。在终端窗口中,执行如下命令:
cat ~/dataset/NASA_log_sample.txt | head -5
可以看到,文件内容如下:
199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/NASA_log_sample.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.LogWritable的Java类,它实现了Writable接口,表示1个日志信息。编辑源代码如下:
package com.simple05;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
public class LogWritable implements Writable {
private Text userIP;
private Text timestamp;
private Text url;
private IntWritable status;
private IntWritable responseSize;
public LogWritable() {
this.userIP = new Text();
this.timestamp = new Text();
this.url = new Text();
this.status = new IntWritable();
this.responseSize = new IntWritable();
}
public void set(String userIP, String timestamp, String url, int status, int responseSize) {
this.userIP.set(userIP);
this.timestamp.set(timestamp);
this.url.set(url);
this.status.set(status);
this.responseSize.set(responseSize);
}
public Text getUserIP() {
return userIP;
}
public void setUserIP(Text userIP) {
this.userIP = userIP;
}
public Text getTimestamp() {
return timestamp;
}
public void setTimestamp(Text timestamp) {
this.timestamp = timestamp;
}
public Text getUrl() {
return url;
}
public void setUrl(Text url) {
this.url = url;
}
public IntWritable getStatus() {
return status;
}
public void setStatus(IntWritable status) {
this.status = status;
}
public IntWritable getResponseSize() {
return responseSize;
}
public void setResponseSize(IntWritable responseSize) {
this.responseSize = responseSize;
}
@Override
public void write(DataOutput out) throws IOException {
userIP.write(out);
timestamp.write(out);
url.write(out);
status.write(out);
responseSize.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
userIP.readFields(in);
timestamp.readFields(in);
url.readFields(in);
status.readFields(in);
responseSize.readFields(in);
}
}
- step3:创建com.simple.LogRecordReader的Java类,它继承自RecordReader类,可以包装多种不同类型的value:
package com.simple05;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class LogRecordReader extends RecordReader<LongWritable, LogWritable> {
private LineRecordReader rr;
private LogWritable value;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
rr = new LineRecordReader();
rr.initialize(split, context);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
String logEntryPattern = "^(\\S+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+)";
if (!rr.nextKeyValue()) {
return false;
}
String line = rr.getCurrentValue().toString();
Pattern pattern = Pattern.compile(logEntryPattern);
Matcher matcher = pattern.matcher(line);
if (!matcher.matches()) {
System.out.println("无效的记录");
return nextKeyValue();
}
String ip = matcher.group(1);
String timestamp = matcher.group(4);
String url = matcher.group(5);
int status = Integer.parseInt(matcher.group(6));
int responseSize = Integer.parseInt(matcher.group(7));
value = new LogWritable();
value.set(ip, timestamp, url, status, responseSize);
return true;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return rr.getCurrentKey();
}
@Override
public LogWritable getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return rr.getProgress();
}
@Override
public void close() throws IOException {
rr.close();
}
}
step4:创建com.simple.LogFileInputFormat的Java类,它继承自FileInputFormat类,可以包装多种不同类型的value:
package com.simple05;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class LogFileInputFormat extends FileInputFormat<LongWritable, LogWritable> {
@Override
public RecordReader<LongWritable, LogWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new LogRecordReader();
}
}
- step5:创建com.simple.LogMapper的Java类,它继承自Mapper类。编辑源代码如下:
package com.simple05;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, LogWritable, Text, LogWritable> {
@Override
protected void map(LongWritable key, LogWritable value, Context context) throws IOException, InterruptedException {
context.write(value.getUserIP(), value);
}
}
- step6:创建com.simple.LogReducer的Java类,它继承自Reducer类。编辑源代码如下
package com.simple05;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogProcessorReduce extends Reducer<Text, LogWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
public void reduce(Text key, Iterable<LogWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (LogWritable val : values) {
sum += val.getResponseSize().get();
}
result.set(sum);
context.write(key, result);
}
}
- step7:创建com.simple.LogDriver的Java类,这是驱动程序类。编辑源代码如下:
package com.simple05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class LogDriver {
public static void main(String[] args) throws Exception {
String input = "hdfs://localhost:9000/NASA_log_sample.txt";
String output = "hdfs://localhost:9000/log-output";
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "日志分析");
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogProcessorReduce.class);
job.setInputFormatClass(LogFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(4)程序测试及运行
运行LogDriver类文件后,在Eclipse控制台如果无错误日志产生,则程序运行完毕且正确。
程序执行完毕之后,查看输出结果。在终端窗口中,执行如下命令:
hdfs dfs -tail /data/log-output2/part-r-00000
可以看到如下的计算结果:
七、MapReduce使用分布式缓存
掌握使用GenericWritable类来包装多个属于不同数据类型的value实例。
1.实验内容
Web日志分析,包括:
- 自定义LogWritable类
- 实现GenericWritable数据类型
- 实现Mapper类
- 实现Reducer类
- 实现Driver类
- 作业提交到集群上运行
2.实验原理
我们可以使用Hadoop Distributed Cache(分布式缓存)来分发只读的、基于文件的资源给Map和Reduce任务。
这些资源可以是简单的数据文件、档案文件(archives)或Mapper/Reducer执行计算所需的JAR文件。
Hadoop在执行任何job的task之前,会先拷贝文件到分布式缓存到所有的工作节点。
对于每个job,Distributed Cache只拷贝这些文件1次。
在Mapper或Reducer的setup() 方法中解析和加载来自Distributed Cache的数据。
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,查看启动的服务,确保如下的进程都已正常启动:
jps
2998 ResourceManager
3110 NodeManager
2488 NameNode
2626 DataNode
2812 SecondaryNameNode
(2)准备数据文件★
- step1:查看源日志文件部分内容。在终端窗口中,执行如下命令:
cat ~/dataset/NASA_log_sample.txt | head -5
可以看到,文件内容如下:
199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/NASA_log_sample.txt /data
- step3:查看需要在集群中缓存的数据文件ip2locale.txt。在终端窗口中,执行如下命令:
cat ~/dataset/ip2locale.txt
可以看到,文件内容如下:
199.120.110.21 北京市
199.72.81.55 上海市
205.189.154.54 广州市
205.212.115.106 深圳市
129.94.144.152 成都市
它存储了用户IP地址和所在城市的映射关系,在map端用来将用户的访问IP替换为所在城市。
- step4:将需要在集群中缓存的数据文件”ip2locale.txt”上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/ip2locale.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.LogWritable的Java类,它实现了WritableComparable接口,表示1个日志信息,并且可以用作key类型:
package com.simple06;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class LogWritable implements WritableComparable<LogWritable> {
private Text userIP, timestamp, request;
private IntWritable responseSize, status;
public LogWritable() {
this.userIP = new Text();
this.timestamp = new Text();
this.request = new Text();
this.responseSize = new IntWritable();
this.status = new IntWritable();
}
public void set(String userIP, String timestamp, String request, int bytes, int status) {
this.userIP.set(userIP);
this.timestamp.set(timestamp);
this.request.set(request);
this.responseSize.set(bytes);
this.status.set(status);
}
@Override
public void readFields(DataInput in) throws IOException {
userIP.readFields(in);
timestamp.readFields(in);
request.readFields(in);
responseSize.readFields(in);
status.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
userIP.write(out);
timestamp.write(out);
request.write(out);
responseSize.write(out);
status.write(out);
}
@Override
public int compareTo(LogWritable o) {
if (userIP.compareTo(o.userIP) == 0) {
return timestamp.compareTo(o.timestamp);
} else
return userIP.compareTo(o.userIP);
}
public int hashCode() {
return userIP.hashCode();
}
public Text getUserIP() {
return userIP;
}
public Text getTimestamp() {
return timestamp;
}
public Text getRequest() {
return request;
}
public IntWritable getResponseSize() {
return responseSize;
}
public IntWritable getStatus() {
return status;
}
}
- step3:创建com.simple.LogProcessorMap的Java类,它继承自Mapper类。编辑源代码如下:
package com.simple06;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class LogProcessorMap extends Mapper<LongWritable, Text, Text, LogWritable> {
LogWritable outValue = new LogWritable();
Text outKey = new Text();
URI[] localCachePath;
HashMap<String, String> maps = new HashMap<String, String>();
@Override
public void setup(Context context) throws IOException {
URI[] localCachePath = context.getCacheFiles();
FileSystem fs = FileSystem.get(localCachePath[0], context.getConfiguration());
FSDataInputStream hdfsInStream = fs.open(new Path(localCachePath[0].getPath()));
String line = "";
line = hdfsInStream.readLine();
while (line != null) {
String[] items = line.split(" ");
maps.put(items[0], items[1]);
line = hdfsInStream.readLine();
}
hdfsInStream.close();
}
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String logEntryPattern = "^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] " (. + ?)" (d{3}) (d+)";
Pattern p = Pattern.compile(logEntryPattern);
Matcher matcher = p.matcher(value.toString());
if (!matcher.matches()) {
System.err.println("Bad Record : " + value);
return;
}
String userIP = matcher.group(1);
String timestamp = matcher.group(4);
String request = matcher.group(5);
int status = Integer.parseInt(matcher.group(6));
int bytes = Integer.parseInt(matcher.group(7));
if (maps.get(userIP) != null) {
userIP = maps.get(userIP);
}
outKey.set(userIP);
outValue.set(userIP, timestamp, request, bytes, status);
context.write(outKey, outValue);
}
}
- step4:创建com.simple.LogProcessorReduce的Java类,它继承自Reducer类。编辑源代码如下
package com.simple06;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LogProcessorReduce extends Reducer<Text, LogWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<LogWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (LogWritable val : values) {
sum += val.getResponseSize().get();
}
result.set(sum);
context.write(key, result);
}
}
- step5:创建com.simple.LogProcessorDriver的Java类,这是驱动程序类。编辑源代码如下:
package com.simple06;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class LogProcessorDriver extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new LogProcessorDriver(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
String inputPath = "/hadoop/nasa/NASA_log_sample.txt";
String outputPath = "/hadoop/nasa/log-output";
Job job = Job.getInstance(getConf(), "log-analysis");
job.addCacheFile(new URI("hdfs://192.168.190.129:8020/data/nasa/ip2locale.txt"));
job.setJarByClass(LogProcessorDriver.class);
job.setMapperClass(LogProcessorMap.class);
job.setReducerClass(LogProcessorReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
int exitStatus = job.waitForCompletion(true) ? 0 : 1;
return exitStatus;
}
}
(4)程序测试及运行
运行LogProcessorDriver类文件,在Eclipse控制台如果无错误日志产生,则程序运行完毕且正确。
程序执行完毕之后,查看输出结果。在终端窗口中,执行如下命令:
hdfs dfs -tail /data/log-output3/part-r-00000
可以看到如下的计算结果:
八、MapReduce结果多路输出
掌握使用MultipleOutputs将计算结果输出到多个文件夹或者文件中。
1.实验内容
有以下销售数据,希望按商品类别分别存储,相同ID的商品存储到同一目录下。
1512,iphone5s,4英寸,指纹识别,A7处理器,64位,M7协处理器,低功耗
1512,iphone5,4英寸,A6处理器,IOS7
1512,iphone4s,3.5英寸,A5处理器,双核,经典
50019780,ipad,9.7英寸,retina屏幕,丰富的应用
50019780,yoga,联想,待机18小时,外形独特
50019780,nexus 7,华硕&google,7英寸
50019780,ipad mini 2,retina显示屏,苹果,7.9英寸
1101,macbook air,苹果超薄,OS X mavericks
1101,macbook pro,苹果,OS X lion
1101,thinkpad yoga,联想,windows 8,超级本
2.实验原理
我们可以使用Hadoop的MultipleOutputs特性从MapReduce产生(emit)出多个输出。
当我们想写不同的输出到不同的文件时,以及当我们需要输出一个额外的输出时(除了job的主输出),这个特性很用。
这个MultipleOutputs特性也允许我们为每个输出指定一个不同的OutputFormat。
只需要在MapClass或Reduce类中加入如下代码:
private MultipleOutputs<Text, IntWritable> mos;
public void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs(context);
}
public void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
然后就可以用mos.write(Key key,Value value,String baseOutputPath)代替context.write(key, value)
3.实验步骤
(1) 启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
- step1:在终端窗口中,执行如下命令,启动HDFS集群。
cd ~/data/bigdata/hadoop-3.3.0/sbin/
./start-all.sh
- step2:在终端窗口中,查看启动的服务,确保如下的进程都已正常启动:
jps
2998 ResourceManager
3110 NodeManager
2488 NameNode
2626 DataNode
2812 SecondaryNameNode
(2)准备数据文件★
- step1:查看源日志文件部分内容。在终端窗口中,执行如下命令:
cat ~/dataset/products.txt
可以看到,文件内容如下:
1512,iphone5s,4英寸,指纹识别,A7处理器,64位,M7协处理器,低功耗
1512,iphone5,4英寸,A6处理器,IOS7
1512,iphone4s,3.5英寸,A5处理器,双核,经典
50019780,ipad,9.7英寸,retina屏幕,丰富的应用
50019780,yoga,联想,待机18小时,外形独特
50019780,nexus 7,华硕&google,7英寸
50019780,ipad mini 2,retina显示屏,苹果,7.9英寸
1101,macbook air,苹果超薄,OS X mavericks
1101,macbook pro,苹果,OS X lion
1101,thinkpad yoga,联想,windows 8,超级本
- step2:将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
hdfs dfs -put ~/dataset/products.txt /data
(3)创建Map/Reduce项目
- step1:打开eclipse开发工具:,创建Java项目并命名为Hadoop3Demo,导入hadoop相关的jar包导入到环境变量
- step2:创建com.simple.MultiOutPutMapper的类,它继承自Mapper类。编辑源代码如下:
package com.simple07;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MultiOutPutMapper extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
if(null != line && 0 != line.length()) {
String[] arr = line.split(",", 2);
context.write(new IntWritable(Integer.parseInt(arr[0])), value);
}
}
}
- step3:创建com.simple.MultiOutPutReducer的类,它继承自Reducer类。编辑源代码如下:
package com.simple07;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public class MultiOutPutReducer extends Reducer<IntWritable, Text, NullWritable, Text> {
private MultipleOutputs<NullWritable, Text> multipleOutputs = null;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for(Text text : values) {
multipleOutputs.write("KeySpilt", NullWritable.get(), text, key.toString()+"/");
multipleOutputs.write("AllPart", NullWritable.get(), text);
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
if(null != multipleOutputs) {
multipleOutputs.close();
multipleOutputs = null;
}
}
}
- step4:创建com.simple.MultiOutPutDriver的类,这是驱动程序类。编辑源代码如下:
package com.simple07;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class MultiOutPutDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "multi-output");
job.setJarByClass(MultiOutPutDriver.class);
job.setMapperClass(MultiOutPutMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(MultiOutPutReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/products.txt"));
MultipleOutputs.addNamedOutput(job, "KeySpilt", TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "AllPart", TextOutputFormat.class, NullWritable.class, Text.class);
Path outPath = new Path("/multi-output");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
}
}
(4)程序测试及运行
- step1:运行.MultiOutPutDriver类文件,在Eclipse控制台如果无错误日志产生,则程序运行完毕且正确。
- step2:程序执行完毕之后,查看输出结果。在终端窗口中,执行如下命令:
ls /root/dataset/multi
可以看到如下的计算结果:
注意:这里多路输出的结果输出在本地文件系统上!!!
- multipleOutputs.write(key, value, baseOutputPath)方法第三个参数为该输出所在的目录(相对于用户指定的输出目录)
- 如果baseOutputPath不包含文件分隔符
/ ,那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn) - 如果包含文件分隔符
/ ,如baseOutputPath=“029070-99999/1901/part”,输出文件则为”029070-99999/1901/part-r-nnnnn”
- step3:查看AllPart这一路的输出结果。在终端窗口中,执行如下命令
cat /root/dataset/multi/AllPart-r-00000
可以看到如下的内容:
- step4:根据key查看对应的商品信息,例如ID为”1512”的商品。在终端窗口中,执行如下命令:
cat /root/dataset/multi/1512/-r-00000
可以看到如下的内容:
|