一、MapReduce
(一)概述
1、mapreduce就是一种分布式的计算模型 2、由谷歌提出,基于hdfs的计算模型,主要就是解决海量数据计算的问题 3、MapReduce由两个阶段组成:Map和Reduce,用户只需要实现Map和Reduce两个函数就可以实现分布式的计算。这样做的目的是为了简化分不是程序的开发和调试周期
(二)组成
1、jobTracker/resourcemanager:任务调度者,管理多个TaskTracker 2、TaskTracker/nodemanager:任务执行者,
在map阶段刚开始的时候会先对这个要处理的文件进行切片处理 切片是一个split对象。这个对象记录了要处理文件的起始位置到切片的最后位置,但是一般情况下,切片的大小就是切块的大小 maptask就是map阶段用户处理任务的线程 maptask处理的逻辑是相同的。只是处理的数据不同 在map阶段,切片不需要程序员完成,按行读取也不需要程序员做,这些都是hadoop框架帮我们做的,程序员只需要编写map方法处理框架帮我们读取完一行数据后的业务逻辑 在默认情况下,maptask在拿到数据之后,默认是按行进行数据读取的 在map处理完数据进入reduce之前,hadoop框架帮助我们根据map输出的key做了聚合操作,也不需要程序员手动执行
(三)序列化
1、hadoop在传输数据时要求数据必须经过序列化 int intWritable string Text
(四)排序
1、hadoop会根据map输出的key进行排序 2、若想根据其它排序
(五)combiner
1、MapReduce中的Combiner是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载 2、combiner也是hadoop调优的一种手段 3、引入combiner对数据统计有用,对排序无用 4、与mapper与reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用 5、一般来说,combiner和reducer它们俩进行同样的操作。 6、但是:特别值得注意的一点,一个combiner只是处理一个结点中的的输出,而不能享受像reduce一样的输入(经过了shuffle阶段的数据)
(六)多元输入
(七)数据本地化策略
移动的是计算
(八)Job的执行流程
1、客户端会提交一个mr的jar给jobclient,提交方式hadoop jar a. 校验输入输出路径是否合法 b. 输入输出的kv类型是否正确 2、jobclient通过rpc和resourcemanager/jobtracker进行通信,返回一个存放jar包的位置(HDFS),生成一个jobid,jobid全局唯一。用于标识该job 3、jobclient会将jar写入到hdfs中 4、开始提交这个任务(任务的描述信息,不是jar。而是jar在hdfs存放的位置 5、jobtracker会进行初始化任务,开始读取hdfs上面要处理的文件,计算切片,每一个切片对应一个maptask 6、tasktracker开始通过心跳机制领取任务(满足数据本地化策略) 7、tasktracker开始下载jar包。开始计算(移动的是运算而不是数据——为了提高效率)将tasktracker和datanode部署到同一台服务器 8、tasktracker开始启动一个java的子进程去执行具体的计算(mappertask或者reducetask) 9、将结果写入hdfs中
(九)shuffle过程
(十)Yarn
1、yarn是一个资源调度框架 2、jobtracker既负责资源的调度,又负责任务的监控 3、Resourcemanager只负责资源的调度,任务的监控交给applicationmaster。applicationmaster启动到nodemanager身上 4、每一个任务都会有一个applicationmaster
二、在eclipse中准备进行MapReduce的环境
1、上传文件到Hdfs中
2、新建java Project
3、添加依赖库
(1)将之前B_HDFS下的hdfs_jar复制到新建的Javaproject(C_MapReduce)中 (2)将以下四个部分的内容复制到上述hdfs_jar中 (3)添加依赖 build path
三、WordCount(统计单词出现的次数)
1、WordCountMapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
2、WordCountReduce
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> value,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable count : value) {
sum+=count.get();
}
context.write(key, new LongWritable(sum));
}
}
3、WordCountDriver
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://192.168.232.129:9000/txt/words.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/wordcount"));
job.waitForCompletion(true);
}
}
4、导出为jar
(1)选中要导出的项目,右击,点击export (2)点击java->JAR file->next (3)点击要导出项目前的”>“标志,只选择src,并去掉.classpath与.project的勾选,并选择导出到的目录,点击next (4)点击next (5)选择主类->finish (6)点击ok
5、将jar包上传到Linux,并运行
(1)上传 (2)运行
hadoop jar WordCount.jar cn.edu.mapreduce.wordcount.WordCountDriver
其中cn.edu.mapreduce.wordcount.WordCountDriver可以经过如下过程复制得到
6、若要统计字符数,只需改动
(1)去掉空格 (2)更改输出目录
7、combiner(提高效率)
(1)WordCountCombiner.class
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable>{
@Override
protected void reduce(Text key, Iterable<LongWritable> value,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
long sum=0;
for (LongWritable count : value) {
sum+=count.get();
}
context.write(key, new LongWritable(sum));
}
}
(2)在WordCountDriver中添加如下内容
job.setCombinerClass(WordCountCombiner.class);
四、Flow(统计流量)
1、flow.java(按手机号码统计流量)
(1)代码
package cn.edu.mapreduce.flow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Flow implements Writable{
private String phone = "";
private String address = "";
private String name = "";
private int flow ;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getFlow() {
return flow;
}
public void setFlow(int flow) {
this.flow = flow;
}
public Flow(String phone, String address, String name, int flow) {
super();
this.phone = phone;
this.address = address;
this.name = name;
this.flow = flow;
}
public Flow() {
super();
}
@Override
public String toString() {
return "Flow [phone=" + phone + ", address=" + address + ", name=" + name + ", flow=" + flow + "]";
}
@Override
public void readFields(DataInput in) throws IOException {
this.phone = in.readUTF();
this.address = in.readUTF();
this.name = in.readUTF();
this.flow = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeUTF(address);
out.writeUTF(name);
out.writeInt(flow);
}
}
(2)小技巧 shift+alt+s
2、FlowMapper
package cn.edu.mapreduce.flow;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FlowMapper extends Mapper<LongWritable, Text, Text, Flow> {
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String[] datas = ivalue.toString().split(" ");
Flow f = new Flow();
f.setPhone(datas[0]);
f.setAddress(datas[1]);
f.setName(datas[2]);
f.setFlow(Integer.parseInt(datas[3]));
context.write(new Text(f.getPhone()), f);
}
}
3、FlowReducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowReducer extends Reducer<Text, Flow, Text, IntWritable> {
public void reduce(Text _key, Iterable<Flow> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (Flow val : values) {
sum += val.getFlow();
}
context.write(_key, new IntWritable(sum));
}
}
4、FlowDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "FlowDriver");
job.setJarByClass(cn.edu.mapreduce.flow.FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.232.129:9000/txt/flow.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/flowresult"));
if (!job.waitForCompletion(true))
return;
}
}
5、分地区计算流量(须自定义分区)
若不自定义分区,hadoop会根据 (1)步骤
1,自定义分区需要继承一个partitioner的类
2,重写类中的getpartition的方法通过返回的int值确定分区3,需要在driver中设置自定义分区类
3,在driver中设置reducetask的数量(分区的数量等于reducetask的数量)
4,每一个reducetask会对应一个输出文件。
(2)FlowPartitioner
package cn.edu.mapreduce.flow;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class FlowPartitioner extends Partitioner<Text, Flow>{
@Override
public int getPartition(Text key, Flow value, int arg2) {
if(value.getAddress().equals("bj")) {
return 0;
}else if(value.getAddress().equals("sh")) {
return 1;
}else {
return 2;
}
}
}
(3)FlowDriver2
package cn.edu.mapreduce.flow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowDriver2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "FlowDriver");
job.setJarByClass(cn.edu.mapreduce.flow.FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Flow.class);
job.setPartitionerClass(FlowPartitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.232.129:9000/txt/flow.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/flowresult"));
if (!job.waitForCompletion(true))
return;
}
}
五、Student(排序)
1、Student .java(按成绩降序排序)
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Student implements WritableComparable<Student>{
private String name = "";
private int score;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public Student(String name, int score) {
super();
this.name = name;
this.score = score;
}
public Student() {
super();
}
@Override
public String toString() {
return "Student [name=" + name + ", score=" + score + "]";
}
@Override
public void readFields(DataInput in) throws IOException {
this.name = in.readUTF();
this.score = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(score);
}
@Override
public int compareTo(Student o) {
return o.score - this.score;
}
}
2、StudentMapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class StudentMapper extends Mapper<LongWritable, Text, Student, NullWritable> {
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String datas[] = ivalue.toString().split(" ");
Student student = new Student();
student.setName(datas[0]);
student.setScore(Integer.parseInt(datas[1]));
context.write(student, NullWritable.get());
}
}
3、StudentReducer.java
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class StudentReducer extends Reducer<Student, NullWritable, Student, NullWritable> {
public void reduce(Student _key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(_key, NullWritable.get());
}
}
4、StudentDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class StudentDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "StudenDriver");
job.setJarByClass(cn.edu.mapreduce.score.StudentDriver.class);
job.setMapperClass(StudentMapper.class);
job.setReducerClass(StudentReducer.class);
job.setOutputKeyClass(Student.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.232.129:9000/txt/student.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/studentresult"));
if (!job.waitForCompletion(true))
return;
}
}
5、Student2.java(按月份升序、成绩降序排序)
相对于Student1的不同,增加了month属性,并且改变排序规则,排序代码如下
@Override
public int compareTo(Student2 o) {
int r1 = this.month - o.month;
if (r1 == 0)
return o.score - this.score;
else
return r1;
}
6、StudentMapper2、StudentReducer2、StudentDriver2
与前面的基本相同,只需将StudentMapper、StudentReducer、StudentDriver中出现的Student改为Student2
六、CharCount
1、CharCountMapper.java
public class CharCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
public void map(LongWritable ikey, Text ivalue, Context context) throws IOException, InterruptedException {
String line = ivalue.toString();
char[] chars = line.toCharArray();
for (char c : chars) {
context.write(new Text(c+""), new IntWritable(1));
}
}
}
2、CharCountReducer.java
public class CharCountReducer extends Reducer<Text, IntWritable, Text, LongWritable> {
public void reduce(Text _key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
long sum=0;
for (IntWritable val : values) {
sum+=val.get();
}
context.write(_key, new LongWritable(sum));
}
}
3、CharCountDriver.java
public class CharCountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "charcountdriver");
job.setJarByClass(cn.edu.mapreduce.charcount.CharCountDriver.class);
job.setMapperClass(CharCountMapper.class);
job.setReducerClass(CharCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.60.128:9000/txt/characters.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.60.128:9000/charcount"));
if (!job.waitForCompletion(true))
return;
}
}
4、CharCountDriver2(多个输入文件)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CharCountDriver2 {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "JobName");
job.setJarByClass(cn.edu.mapreduce.charcount.CharCountDriver2.class);
job.setMapperClass(CharCountMapper.class);
job.setReducerClass(CharCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
MultipleInputs.addInputPath(job, new Path("hdfs://192.168.232.129:9000/txt/words.txt"), TextInputFormat.class);
MultipleInputs.addInputPath(job, new Path("hdfs://192.168.232.129:9000/txt/characters.txt"), TextInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.232.129:9000/charcount2"));
if (!job.waitForCompletion(true))
return;
}
}
七、一些eclipse小技巧
1、eclipse自动补全
windows->perference->java->Editor->Content Assist 在下图的粉色框中输入
.qwertyuioplkjhgfdsazxcvbnm
2、eclipse:自动覆盖@Override和实现接口
右击->source->Override/Implement Methods…(列出父类有什么函数,我们要覆盖哪个函数):
3、eclipse快捷键
shift+alt+s
|