1、编程规范
(1)用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行 mr 程序的客户端)
(2)Mapper 的输入数据是 KV 对的形式(KV 的类型可自定义)
(3)Mapper 的输出数据是 KV 对的形式(KV 的类型可自定义)
(4)Mapper 中的业务逻辑写在 map()方法中
(5)map()方法(maptask 进程)对每一个<K,V>调用一次
(6)Reducer 的输入数据类型对应 Mapper 的输出数据类型,也是 KV
(7)Reducer 的业务逻辑写在 reduce()方法中
(8)Reducetask 进程对每一组相同 k 的<k,v>组调用一次 reduce()方法
(9)用户自定义的 Mapper 和 Reducer 都要继承各自的父类
(10)整个程序需要一个Drvier来进行提交,提交的是一个描述了各种必要信息的 job对象
2、WordCount示例编写
需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
新建maven项目进行如下操作:
2.1 引入pom.xml文件
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib</classpathPrefix>
<mainClass>cn.hadooptest.mapreduce.WordCountDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
注意:
以上pom中的mainClass是你程序运行的主类的完整路径,也就是用来描述job并提交job的类,及Driver类
2.2?定义一个mapper类
package cn.hadooptest.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
* KEYIN:表示mapper数据输入的时候key的数据类型,在默认的读取数据组件下,加InputFormat,它的行为是一行一行的读取待处理的数据,
* 读取一行,返回一行给我们的mr程序,这种情况下keyin就表示每一行的起始偏移量 因此数据类型是Long
*
* VALUEIN:表述mapper数据输入的时候value的数据类型,在默认的读取数据组件下valuein就表示读取的这一行内容 因此数据类型是String
*
* KEYOUT:表示mapper数据输出的时候key的数据类型 在本案例中 输出的key是单词 因此数据类型是 String
*
* VULUEOUT:表示mapper数据输出的时候value的数据类型 在案例当中 输出的key是单词的次数 因此数据类型是Integer
*
*这里所说的数据类型String long都是jdk自带的类型 在序列化的时候 效率低下 因此hadoop自己封装一套数据类型
*
* long->LongWritable
* String->Text
* Integer->Intwritable
* null-->NullWritable
*
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
/*
* 这里就是mapper阶段具体的业务逻辑实现方法 该方法的调用取决于读取数据的组件有没有给mr传入数据
* 如果有的话 每传入一个<k,v>对 该方法就会被调用一次
*
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
//拿到一行数据转换为 string
String line = value.toString();
//将这一行切分出各个单词
String[] words = line.split(" ");
//遍历数组,每出现一个单纯 就标记一个数字1 输出<单词,1>
for(String word:words){
//使用mr程序的上下文context 把mapper阶段处理的数据发送出去
//作为reduce节点的输入数据
context.write(new Text(word), new IntWritable(1));
}
}
}
2.3?定义一个reducer类
package cn.hadooptest.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/*
* KEYIN :就是reducer阶段输入的数据key类型,对应mapper的输出key 在本案例中 就是单词 Text
*
* VALUEIN:就是reducer阶段输入的数据value类型,对应mapper的输入value类型 在本案例中 就是单词次数 IntWritable
*
* KEYOUT:就是reducer阶段输出的数据key类型 在本案例中 就是单词 Text
*
* VALUEOUT:reducer阶段输出的数据value类型 正在本案例中 就是单词的中次数 IntWritable
* */
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
/*
*
* reduce接收所有来自map阶段处理的数据之后,按照key的字典序进行排序
* <hello,1><hadoop,1><spark,1><hadoop,1>
*排序后:
* <hadoop,1><hadoop,1><hello,1><spark,1>
*按照key是否相同作为一组去调用reduce方法
* 本方法的key就是这一组相同kv对的共同key
* 把这一组所有的v作为一个迭代器传入我们的reduce方法
* */
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
InterruptedException {
//定义一个计数器
int count = 0;
//遍历这一组 kv 的所有 v,累加到 count 中
for(IntWritable value:values){
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
2.4?定义一个主类,用来描述job并提交job
package cn.hadooptest.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* 这个类就是mr程序运行时候的主类,本类中组装了一些程序运行时候所需要的信息
* 比如:使用的是那个Mapper类 那个Reducer类 输入数据在哪 输出数据在什么地方
*
* */
public class WordCountDriver {
public static void main(String[] args) throws Exception{
//通过Job来封装本次mr的相关信息
Configuration configuration =new Configuration();
Job job = Job.getInstance(configuration);
//指定本次mr job jar包运行主类
job.setJarByClass(WordCountDriver.class);
//指定本次mr 所用的mapper reducer类分别是什么
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//指定本次mr mapper阶段的输出 k v类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定本次mr 最终输出的 k v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定本次mr 输入的数据路径 和最终输出结果存放在什么位置
FileInputFormat.setInputPaths(job,"/wordcount/input");
FileOutputFormat.setOutputPath(job,new Path("/wordcount/output"));
// job.submit();
//提交程序 并且监控打印程序的执行情况
boolean b=job.waitForCompletion(true);
System.exit(b?0:1);
}
}
编写完成后点击maven的package进行打包,会在target中生成jar包
下一个章节来讲解如何运行。
|