目录
一.MapReduce的运行步骤(待补充)
二.编程模板
? ? ? ?1.自定义?Mapper类继承Mapper,并重写map方法:
????????2.自定义Reducer类集成Reducer,并重写Reduce方法
??? ? ? ? 3.Driver 主入口,整合mapper和reducer
? ? ? ? 4.idea 打成jar包
? ? ? ? 5.服务器执行hadoop jar?
一.MapReduce的运行步骤(待补充)
二.编程模板
? ? ? ? 按照以下的1,2,3模板进行编程,然后打成jar包,放在服务器上运行。
????????以耳熟能详的WordCount举例,统计多个文件中每个单词的数量。
? ? ? ?1.自定义?Mapper类继承Mapper,并重写map方法:
? ? ? ? map方法一般是读取Hdfs上的数据,默认是一行行读取,有多少行,就调用了多少次map方法:
package com.xxj.hadoop.config.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 自定义类 extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* 输入:一行行读取
* KEYIN, 输入的key的类型 这里指的是每一行的起始偏移量 long
* VALUEIN 输入的value的类型 这里指的是每一行的内容和偏移量一一对应的 String
* 输出:
* KEYOUT, 输出的键的类型 ,类型取决于 业务
* VALUEOUT,输出的值的类型,类型取决于 业务
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split(",");
//比如每一行aa bb cc,根据逗号, 切分后,把字符串为key,value为1
for (String word : split) {
Text text = new Text(word);
IntWritable num = new IntWritable(1);
context.write(text, num);
}
}
}
? ? ? ? 其中:mapper后的四个泛型<KEYIN, VALUEIN, KEYOUT, VALUEOUT>:
???输入:一行行读取
????????KEYIN 输入的key的类型 这里指的是每一行的起始偏移量,long,没有实际意义,hadoop的底层数据读取的时候是按字节读取的
????????VALUEIN 输入的value的类型 这里指的是每一行的内容和偏移量一一对应的 String
???输出:
????????KEYOUT, 输出的键的类型 ,类型取决于 业务
????????VALUEOUT,输出的值的类型,类型取决于 业务
???????这里的数据类型 ?不能使用java的原生类型,如int,String。
? ? ? ? 原因:?
? ? ? ? (1)首先在数据存储和数据网络传输中,需要将数据序列化和反序列化,如传输:“顽强的豆芽”,首先会将“顽强的豆芽” 序列化为 1000101001,传输完成,读取的时候,会将1000101001反序列化为我们看得懂的“顽强的豆芽”。
? ? ? ? (2)Java的原生类型,如int,String,Long虽然实现了Java的序列化Serializable接口,但是Serializable接口太重且繁琐,因为它不仅会将值序列化,也会将相关类也序列化,所以Hadoop自己实现的自己的一套序列化和反序列化的接口:Writable,只会对数据的值进行序列化和反序列化。
? ? ? ? Hadoop也实现一些基本数据类型的序列化类,可以供我们使用:
Java | Hadoop | int | IntWritable | long | LongWritable | string | Text | byte | ByteWritable | double | DoubleWritable | float | FloatWritable | boolean | BooleanWritable | null | NullWritable |
???????????????自己定义的需要序列化和反序列化可以通过实现 Writable接口来使用。
? ? ? ? ? ? ? ? 在重写map方法时,如果中间处理数据时将类型转化为Java的数据类型,将结果写入上下文对象Context,要重新转为Hadoop的类型。
????????2.自定义Reducer类集成Reducer,并重写Reduce方法
? ? ? ? ?reducer是读取map初步处理好的数据,做数据计算,比如求和,求最大值,最小值等等,
package com.xxj.hadoop.config.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//循环遍历values 求和
int sum=0;
for(IntWritable v:values){
//mapreduce的框架已经帮我们做好了从map出来后已经做好按key分组,
// 也就是到这里的,Iterable<IntWritable> values 是同一个单词的数量迭代器,进行相加就可以得到最后的数量
//类似于{"aa":[1,1,1,1,1]},所以统计aa单词出现的个数的话,只需要将迭代器中的[1,1,1,1,1]相加就可以得出总数
sum+=v.get();
}
//写出结果文件
IntWritable rv=new IntWritable(sum);
context.write(key, rv);
}
}
?? ? ? ? ? 3.Driver 主入口,整合mapper和reducer
? ? ? ? Driver类中有入口main方法,这个方法主要(1)开启一个job (2)指定mapper类和reducer类,然后程序不知道这两个类是在jar中哪个文件,(3)设置map输出key value的类型和设置reduce输出key value的类型,虽然在具体的mapper和reducer类已经指定类型,但是因为泛型编译的时候生效,运行的时候泛自动擦除,所以这主函数需要再设置,
(3)创建读取流 FileInputFormat 来读取hdfs的数据,一行行(底层是文件读取器:LineRecordReader
)传入map方法,通过map方法,和框架的shuffle过程(在框架里面,我们不用写代码,自动帮我们实现,主要是实现,分区 Partitioner ,排序 sort ,分组 group, combiner组件,这个在mapreducer过程解析会提到),以及reduce方法,也会(4)创建输出流FileOutputFormat 将结果输出的hdfs的指定位置。(5)job提交语句:job.waitForCompletion(true) ,true表示需要打印日志
?
package com.xxj.hadoop.config.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) {
//将mapper reducer类进行一个封装 封装为一个任务----job(作业)
//加载配置文件
Configuration conf = new Configuration();
//启动一个Job 创建一个job对象
try {
Job job = Job.getInstance(conf);
//设置这个job
//设置整个job的主函数入口
job.setJarByClass(Driver.class);
//设置job的mappper的类
job.setMapperClass(WordCountMapper.class);
//设置job的reducer的类
job.setReducerClass(WordCountReducer.class);
//设置map输出key value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置reduce的输出的k v类型 以下方法设置的是mr的最终输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定需要统计的文件的输入路径 FileInputFormat 文件输入类
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
//指定输出目录 输出路径不能存在的 否则会报错 默认输出是覆盖式的输出 如果输出目录存在 有可能造成原始数据的丢失
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
//提交job 执行这一句的时候 job才会提交 上面做的一系列的工作 都是设置job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
????????
? ? ? ?
? ? ? ? 4.idea 打成jar包
?选择file---->project Structure
?
?按完apply之后,可以选择主文件,这样到时候在服务器上运行jar包的时候就不用指定主函数文件路径,
?
然后再就会out文件夹出现打的jar包
?上hadoop集群的任一节点:
在之前上传了数据3份数据:
?每份数据为:
?
? ? ? ? 5.服务器执行hadoop jar?
???????????????hadoop jar wc01.jar /wcnew/ /wcoutnew/
最后查询得出的结果:
?
?
?
?
|