来源:《hadoop大数据开发实战》
实验一:统计乘用车辆和商用车辆的数量和销售额分布
设计思路:
首先,写一个Mapper来映射输出所有乘用车辆(feiyingyun)和商用车辆(yingyun)的记录。
然后,写一个reduce统计出乘用车辆和商用车辆各自的数量,写一个map的映射集合中,其中key是车辆类型,value为车辆类型的数量。
同时,定义一个成员变量,统计乘用车辆和商用车辆的总和。
最后,重写reduce中的cleanup方法,在其中计算出乘用车辆和商用车辆各自的销售额分布
然后,输出到HDFS分布式文件系统中。
程序代码:
package car;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
class CountMap extends Mapper<LongWritable,Text,Text,LongWritable>{
public void map(
LongWritable key,
Text value,
org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws java.io.IOException,InterruptedException{
String[] owns=value.toString().trim().split(",");
if(null!=owns&&owns.length>2&&owns[2]!=null) {
if(owns[2].equals("feiyingyun")) {
context.write(new Text("chengyong"), new LongWritable(1));
}else {
context.write(new Text("shangyong"), new LongWritable(1));
}
}
}
}
class CountReduce extends Reducer<Text,LongWritable,Text,DoubleWritable>{
Map<String,Long>maps=new HashMap<String,Long>();
double all=0;
public void reduce(Text key,java.lang.Iterable<LongWritable>values,org.apache.hadoop.mapreduce.Reducer<Text,LongWritable,Text,DoubleWritable>.Context context) throws java.io.IOException,InterruptedException{
long sum=0;
for(LongWritable val:values) {
sum+=val.get();
}
all+=sum;
maps.put(key.toString(), sum);
};
protected void cleanup(org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, DoubleWritable>.Context context)throws java.io.IOException,InterruptedException {
Set<String>keySet=maps.keySet();
for(String str:keySet) {
long value=maps.get(str);
double percent=value/all;
context.write(new Text(str),new DoubleWritable(percent));
}
};
}
public class Car{
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();
Job job1=Job.getInstance(conf,Car.class.getName());
job1.setJarByClass(Car.class);
job1.setMapperClass(CountMap.class);
job1.setReducerClass(CountReduce.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(LongWritable.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1,new Path(args[1]));
job1.waitForCompletion(true);
}
}
car.txt?
shanxi,3,feiyingyun
shanxi,3,yingyun
shanxi,3,feiyingyun
shanxi,3,yingyun
shanxi,3,feiyingyun
shanxi,3,feiyingyun
shanxi,3,feiyingyun
shanxi,3,yingyun
shanxi,3,feiyingyun
shanxi,3,yingyun
shanxi,3,feiyingyun
shanxi,3,feiyingyun
shanxi,3,yingyun
?将上述代码在Eclipse中打包为CountCar.jar。
接下来将car.txt上传到Hadoop分布式文件系统HDFS的根目录下,之后提交本次Job程序。
hadoop jar ./CountCar.jar /car.txt /car
实验二:统计某年每个月的汽车销售数量的比例
设计思路:
通过一个Mapper映射输出每个月份的汽车销售记录
再通过一个reduce计算出每个月份的销售总数
同时将所有月份的销售数量进行累加
然后用每个月份的汽车销售总数除以各个月份的销售总和,就计算出了每个月的汽车销售数量的比例。
程序代码:
package car1;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
class MouthMap extends Mapper<Object,Text,Text,IntWritable>{
public void map(
Object key,
Text value,
org.apache.hadoop.mapreduce.Mapper<Object, Text, Text, IntWritable>.Context context)
throws java.io.IOException,InterruptedException{
String[] str=value.toString().trim().split(",");
if(null!=str&&str[1]!=null) {
context.write(new Text(str[0]), new IntWritable(Integer.parseInt(str[1])));
}
}
}
class MouthReduce extends Reducer<Text,IntWritable,Text,DoubleWritable>{
Map<String,Integer>map=new HashMap<String,Integer>();
int all=0;
public void reduce(Text key,java.lang.Iterable<IntWritable>value,org.apache.hadoop.mapreduce.Reducer<Text,IntWritable,Text,DoubleWritable>.Context context) throws java.io.IOException,InterruptedException{
int count=0;
for(IntWritable con:value) {
count+=con.get();
}
all+=count;
map.put(key.toString(), count);
};
protected void cleanup(org.apache.hadoop.mapreduce.Reducer<Text, IntWritable, Text, DoubleWritable>.Context context)throws java.io.IOException,InterruptedException {
Set<String>keys=map.keySet();
for(String key:keys) {
int value=map.get(key);
double percent=value*1.0/all;
context.write(new Text(key),new DoubleWritable(percent));
}
};
}
public class MouthCount2 {
public static void main(String[] args)throws Exception{
Configuration conf=new Configuration();
Job job=Job.getInstance(conf,MouthCount2.class.getName());
job.setJarByClass(MouthCount2.class);
job.setMapperClass(MouthMap.class);
job.setReducerClass(MouthReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.waitForCompletion(true);
}
}
?car_month.txt
1,10
2,12
3,10
4,8
5,9
6,12
7,1
8,2
9,3
10,4
11,5
12,6
下面操作与实验一类似
?
|