Mapper
public class workcountmapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text outk =new Text();
IntWritable outv=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据
final String line = value.toString();
//切割
final String[] work = line.split(" ");
//遍历输出
for (String s : work) {
outk.set(s);
context.write(outk,outv);
}
}
}
reduce
public class workcountreduce extends Reducer<Text, IntWritable,Text,IntWritable> {
IntWritable outk=new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for (IntWritable value : values) {
sum += value.get();
}
outk.set(sum);
context.write(key,outk);
}
}
?wordcount
public class workcountdriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取job
final Configuration conf = new Configuration();
final Job job = Job.getInstance(conf);
//2.设置jar位置
job.setJarByClass(workcountdriver.class);
//连接mapper和reducer
job.setMapperClass(workcountmapper.class);
job.setReducerClass(workcountreduce.class);
//获取map的kv值
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置最终输出文件的kv值
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置输入和输出文件位置
FileInputFormat.setInputPaths(job,new Path("D:\\ChangZhi\\mr测试文档\\workcount\\input"));
FileOutputFormat.setOutputPath(job,new Path("D:\\ChangZhi\\mr测试文档\\workcount\\output2"));
//提交job
final boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
2.简单·实现mr读写文件
mapper阶段
public class AcMapper extends Mapper<LongWritable, Text,Text,UserAction> {
IntWritable one=new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//读取文件
String[] infos=value.toString().split(",");
//准备输出键
String outkey =infos[6]+infos[5];
//准备输出
UserAction ua=new UserAction(outkey,1);
//数据传给环形缓冲区
context.write(new Text(outkey),ua);
}
}
reduce阶段
public class AcReduce extends Reducer<Text,UserAction,Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<UserAction> values, Context context) throws IOException, InterruptedException {
//安装用户id和用户acttype已经分好的
int num=0;
for(UserAction ua:values){
num+=ua.getCnt();
}
context.write(key,new IntWritable(num));
}
}
书写实体类对象方法
public class UserAction implements WritableComparable<UserAction> {
private String useridAndAct;
private int cnt;
public UserAction() {
}
public UserAction(String useridAndAct, int cnt) {
this.useridAndAct = useridAndAct;
this.cnt = cnt;
}
public String getUseridAndAct() {
return useridAndAct;
}
public void setUseridAndAct(String useridAndAct) {
this.useridAndAct = useridAndAct;
}
public int getCnt() {
return cnt;
}
public void setCnt(int cnt) {
this.cnt = cnt;
}
@Override
public int compareTo(UserAction o) {
return 0;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(useridAndAct);
out.writeInt(cnt);
}
@Override
public void readFields(DataInput in) throws IOException {
this.useridAndAct=in.readLine();
this.cnt=in.readInt();
}
}
test
public class AcDemo {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Path inputPath = new Path("hdfs://192.168.80.181:9000/log/202001");
Path outPutPath = new Path("file:///D:/io流/res1");
//更改系统用户名
System.setProperty("HADOOP_USER_NAME", "root");
//判断输出文件夹是否存在,存在则删除
FileSystem fs = FileSystem.get(new Configuration());
if (fs.exists(outPutPath)) {
fs.delete(outPutPath, true);
}
//准备一个空任务
Job job = Job.getInstance();
//任务的入口类
job.setJarByClass(WcDemo.class);
//给任务起个名字
job.setJobName("ActionCount");
//设置输入文件路径
FileInputFormat.addInputPath(job, inputPath);
//设置输出文件路径
FileOutputFormat.setOutputPath(job, outPutPath);
//设置job运行的Mapper,Reducer类型
job.setMapperClass(AcMapper.class);
job.setReducerClass(AcReducer.class);
//设置Mapper文件输出的信息类型,如果Mapper和Reducer输出类型一致,可省略
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UserAction.class);
//设置Reducer文件输出的信息类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 启动
job.waitForCompletion(true);
}
}
|