-
统计单词出现的次数,将a-n开头的单词放??个?件,o-z开头的单词放??个?件, 其他的放??个?件。 1.1 自定义mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class myMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text k2 = new Text();
IntWritable v2 = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineRecord = value.toString();
System.out.println(lineRecord);
String[] words = lineRecord.split(" ");
for (String word : words) {
k2.set(word);
v2.set(1);
try {
context.write(k2,v2);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.2 自定义reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
@Override
public void reduce(Text k2, Iterable<IntWritable> v2, Context context){
System.out.println("123");
int sum = 0;
for (IntWritable intWritable : v2) {
sum+=intWritable.get();
}
System.out.println("sum"+sum);
IntWritable v3 = new IntWritable(sum);
try {
context.write(k2,v3);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1.3 自定义分片规则
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class WordcountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable intWritable, int i) {
if(text.toString().matches("^[a-iA-I].*"))
return 0;
else return 1;
}
}
1.4 启动类,设置参数
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyDriver {
public static void main(String[] args) {
try {
Configuration entries = new Configuration();
Job job = Job.getInstance(entries);
job.setJarByClass(MyDriver.class);
job.setMapperClass(myMapper.class);
job.setReducerClass(wordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(WordcountPartitioner.class);
FileInputFormat.addInputPath(job,new Path("E:/y/*"));
Path path = new Path("E:/y/output");
FileSystem fileSystem = FileSystem.get(entries);
if(fileSystem.exists(new Path("E:/y/output"))){
fileSystem.delete(new Path("E:/y/output"),true);
}
FileOutputFormat.setOutputPath(job,path);
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
-
绘制入门案例wordcount的图解
-
数据如下
7369,SMITH,CLERK,7902,1980-12-17,800,null,20
7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30
7566,JONES,MANAGER,7839,1981-04-02,2975,null,20
7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30
7782,CLARK,MANAGER,7839,1981-06-09,2450,null,10
7788,SCOTT,ANALYST,7566,1987-04-19,3000,null,20
7839,KING,PRESIDENT,null,1981-11-17,5000,null,10
7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30
7876,ADAMS,CLERK,7788,1987-05-23,1100,null,20
7900,JAMES,CLERK,7698,1981-12-03,950,null,30
7902,FORD,ANALYST,7566,1981-12-02,3000,null,20
7934,MILLER,CLERK,7782,1982-01-23,1300,null,10
使用mr程序统计每年入职的人数。
最终结果要求如下:
1. 格式如下:
年份:1980 人数:xxx
年份:1981 人数:xxx
.......
2. 两个分区:
0分区存储 入职年份<1982年的
1分区存储 入职年份>=1982年的
PersonCountMapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PersonCountMapper extends Mapper<LongWritable,Text,Text, IntWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
String s = v1.toString();
String year = s.split(",")[4].split("-")[0];
System.out.println(year);
context.write(new Text(year),new IntWritable(1));
}
}
PersonCountReducer
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PersonCountReducer extends Reducer<Text, IntWritable,Text,Text> {
@Override
protected void reduce(Text k2, Iterable<IntWritable> v2, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : v2) {
sum+=intWritable.get();
}
context.write(new Text("年份:"+k2),new Text("人数:"+sum));
}
}
PersonCountPartitioner
public class PersonCountPartitioner extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text text, IntWritable i1, int i) {
int n = Integer.parseInt(text.toString());
return (n<1982)?0:1;
}
}
PersonCountDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PersonCountDriver {
public static void main(String[] args) {
try {
Configuration entries = new Configuration();
Job job = Job.getInstance(entries);
job.setJarByClass(PersonCountDriver.class);
job.setMapperClass(PersonCountMapper.class);
job.setReducerClass(PersonCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(2);
job.setPartitionerClass(PersonCountPartitioner.class);
FileInputFormat.addInputPath(job,new Path("E:/y/person.txt"));
Path path = new Path("E:/y/output2");
FileSystem fileSystem = FileSystem.get(entries);
if(fileSystem.exists(new Path("E:/y/output2"))){
fileSystem.delete(new Path("E:/y/output2"),true);
}
FileOutputFormat.setOutputPath(job,path);
System.exit(job.waitForCompletion(true)?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
|