一、MapReduce概述
(一)MapReduce概述
MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在Hadoop集群上。
框架是一个半成品,我们要学习的就是这些默认组件以及怎么将自己的业务逻辑代码与框架进行有机结合形成一个完整的应用程序。
(二)MapReduce优缺点
1、优点
????????易于编程
简单地实现一些接口或者继承一些框架准备好的类,重写一些方法,就能够与框架进行有机结合,完成一个分布式程序,并部署到大量廉价的PC机器上运行。因为这个特点使得MapReduce编程变的非常流行。
????????良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力,而不需要重新编写代码。
????????高容错性
MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,而廉价的机器性能差,运行时很容易出现错误,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。
????????适合PB级别数据处理
它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。
????????2、缺点
????????不擅长实时计算
这个是从返回结果的实效上来说的,MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。
????????不擅长流计算
这个是从它能处理的数据上来说的,流式计算的输入数据是动态的,像河流一样,源源不断,而MapReduce能够处理的输入数据集是静态的,不能动态变化。在设计时,就没打算处理动态的数据。
????????不擅长DAG计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到hdfs,还要从hdfs中读出,在大数据的场景下,更会造成大量的磁盘IO,导致性能非常的低下。
(三)MapReduce进程(MR)
程序由单机版扩成分布式版时,会引入大量的复杂工作,如运算至少分为两个过程,先并行计算,然后统一汇总,这两个阶段如何启动如何协调,数据找程序还是程序找数据,任务由谁分配怎么分配,如何处理容错,如何监控,出错如何重试.....,MapReduce 把大量分布式程序都会涉及的到的内容都封装起来,由三类进程去管理,让开发人员可以将精力集中于业务逻辑。以下便是这三类进程。
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责map阶段的整个数据处理流程。
- ReduceTask:负责reduce阶段的整个数据处理流程。
(四)MapReduce编程规范
MapReduce的思想核心是“分而治之”,用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端)
1、Mapper阶段
该阶段由框架把输入数据以kv对的形式读取进来,我们需要将其处理之后再组装为kv对写出到reduce。
??? (1)用户自定义的Mapper要继承自己的父类
??? (2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
??? (3)Mapper中的业务逻辑写在map()方法中
??? (4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
??? (5)map()方法(MapTask进程)对每一个<K,V>调用一次
2、Reducer阶段
该阶段接收来自mapper写出的kv对数据,在这里经过处理后,再组装好kv对写出到结果文件
??? (1)用户自定义的Reducer要继承自己的父类
??? (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
??? (3)Reducer的业务逻辑写在reduce()方法中
??? (4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3、Driver阶段
相当于yarn集群的客户端,用于提交我们整个程序到yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象。然后由yarn去执行我们写好的mapper、reduce,分别生成MapTask、ReduceTask。
二、MapReduce入门案例
单词计数是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版"Hello World"。
(一)WordCount需求分析?
??? 在给定的文本文件中统计输出每一个单词出现的总次数
?按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver。
?
(二)WordCount案例编写
(1)新建工程并导入pom依赖????
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
(2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入:
log4j.rootLogger=debug, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(3)编写MapReduce程序
a.创建包com.bigdata.mapreduce
?b.编写mapper类
package com.bigdata.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
// KEYIN, VALUEIN 输入数据key,value的类型 取决于框架 <行首偏移量,每行内容>
// 框架会帮我们读取数据,将每行行首的偏移量封装进key变量,例如0,13,28这些是数字,不用int,long来表示,
// 此处用LongWritable表示,将每行的内容,例如hello world,封装进value变量,这些是字符串, 不用String表示,用Text,
// KEYOUT, VALUEOUT 输出数据key,value的类型 取决于咱们自己的业务逻辑
// 因为我们将框架给我们送过来的数据处理完毕后,要组装kv对输出,我们想要将单词作为key输出,用Text表示
// 将单词出现的次数,即1,作为value输出,用IntWritable表示 <单词,1>
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();
IntWritable v = new IntWritable(1);
Logger logger = LogManager.getLogger(WordcountMapper.class);
// 在这里填入自己的业务逻辑
// 框架会帮我们读取数据,先读取第一行,将第一行的行首偏移量,例如0,封装进key变量,将每行内容,例如 hello world,封装进value
// 然后调用map方法,我们在map里面写入处理数据的逻辑,处理完毕之后,框架再读取第二行,将第二行行首的偏移量封装进key,将第二行的
// 内容封装进value,再去调用map方法,
// map方法什么时候调用一次?框架每读取一行数据,封装一次kv对,调用一次map方法
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 业务逻辑
// 1 将每行内容转化为字符串,hello world
String line = value.toString();
// 2 将每行内容按照空格切割,[hello,world]
String[] words = line.split(" ");
// 3 遍历数组,取出每个单词 ,hello,
for (String word : words) {
// 4 按照<单词,1> 的样式,组装kv对
logger.info("hhhhhhhhhhhh:"+word);
System.out.println("hahahaha");
k.set(word);
// v.set(1);
// 5 将kv对写出
context.write(k,v);
}
}
}
c.编写reducer类
package com.bigdata.mapreduce;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// KEYIN,VALUEIN 输入数据kv的类型 与mapper的keyout,valueout相对应
// KEYOUT,VALUEOUT 输出数据kv的类型 取决于reduce阶段的逻辑,这里我们想要输出<单词,单词出现的总次数>
public class WordcountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
//reduce方法什么使用调用一次?分组调用,框架会对所有的map输出的kv根据k进行分组,k相同的kv们为一组,对这一组kv们调用一次reduce方法
// 会将k封装进key,将v们封装进迭代器values变量
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// <hello,1>
// <hello,1>
// <hello,1>
//遍历values,将每个v进行累加
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
sum = sum + i;
}
context.write(key,new IntWritable(sum));
}
}
d.编写驱动类
package com.bigdata.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws Exception {
// 1 创建一个配置对象
Configuration conf = new Configuration();
// 2 通过配置对象创建一个job
Job job = Job.getInstance(conf);
// 3 设置job的mr的路径(jar包的位置)
job.setJarByClass(WordcountDriver.class);
// 4 设置job的mapper类 reduce类
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReduce.class);
// 5 设置job的mapper类的keyout,valueout
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 6 设置job的最终输出的keyout,valueout
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 7 设置job的输入数据的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
FileOutputFormat.setOutputPath(job,new Path(args[1]));
// 9 提交job到yarn集群
boolean b = job.waitForCompletion(true);
System.out.println("是否运行成功:"+b);
}
}
(4)本地测试
a.在windows环境上配置HADOOP_HOME环境变量
i.? 将hadoop-3.2.1.tar.gz和winutils.zip进行解压
ii. 将winutils的所有文件拷贝到解压后的hadoop-3.2.1下的bin目录下
iii.配置HADOOP_HOME环境变量并进行验证
b.在eclipse/idea上运行程序
(5)集群上测试
a.将程序打成jar包,然后拷贝到hadoop集群中
b.启动hadoop集群
c.执行wordcount程序
提前将若干个要统计单词的文本上传到hdfs的/test/wc/input目录
配置集群的日志聚集功能:
core-site.xml中添加:
<!-- 配置 HDFS 网页登录使用的静态用户为offcn -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>offcn</value>
</property>
mapred-site.xml中添加:
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>bd-offcn-01:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>bd-offcn-01:19888</value>
</property>
yarn-site.xml中添加:
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://bd-offcn-01:19888/jobhistory/logs</value>
</property>
<!-- 设置日志保留时间为 7 天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
分发3个文件到其它机器:并重启hdfs、yarn,启动历史服务器
[offcn@bd-offcn-01 ~]$ scp /home/offcn/apps/hadoop-3.2.1/etc/hadoop/core-site.xml bd-offcn-02:/home/offcn/apps/hadoop-3.2.1/etc/hadoop/
[offcn@bd-offcn-01 ~]$ scp /home/offcn/apps/hadoop-3.2.1/etc/hadoop/core-site.xml bd-offcn-03:/home/offcn/apps/hadoop-3.2.1/etc/hadoop/
[offcn@bd-offcn-01 ~]$ scp /home/offcn/apps/hadoop-3.2.1/etc/hadoop/mapred-site.xml bd-offcn-02:/home/offcn/apps/hadoop-3.2.1/etc/hadoop/
[offcn@bd-offcn-01 ~]$ scp /home/offcn/apps/hadoop-3.2.1/etc/hadoop/mapred-site.xml bd-offcn-03:/home/offcn/apps/hadoop-3.2.1/etc/hadoop/
[offcn@bd-offcn-01 ~]$ scp /home/offcn/apps/hadoop-3.2.1/etc/hadoop/yarn-site.xml bd-offcn-02:/home/offcn/apps/hadoop-3.2.1/etc/hadoop/
[offcn@bd-offcn-01 ~]$ scp /home/offcn/apps/hadoop-3.2.1/etc/hadoop/yarn-site.xml bd-offcn-03:/home/offcn/apps/hadoop-3.2.1/etc/hadoop/
启动历史服务器:
[offcn@bd-offcn-01 ~]$ mapred --daemon start historyserver
[offcn@bd-offcn-01 ~]$ hadoop jar mymapreduce-1.0-SNAPSHOT.jar com.bigdata.wordcount.WordcountDriver /test/wc/input /test/wc/output
三、Hadoop序列化
(一)序列化概念
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储(持久化)和网络传输。?
反序列化就是将收到字节序列(或其他数据传输协议)或者是硬盘的持久化数据,转换成内存中的对象。
(二)java和hadoop序列化对比
?Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable),特点如下:
????????紧凑
紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资源。
????????快速
进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的。
????????可扩展
协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文
????????互操作
能支持不同语言写的客户端和服务端进行交互。?
(三)常用数据序列化类型
Java类型 | Hadoop Writable类型 | boolean | BooleanWritable | byte | ByteWritable | int | IntWritable | float | FloatWritable | long | LongWritable | double | DoubleWritable | String | Text | map | MapWritable | array | ArrayWritable |
(四)自定义序列化对象(实现接口Writable)
Hadoop给我们准备的常用类型已经具备hadoop的序列化标准了,可以进行序列化传输,而我们自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下几项:
(1)必须实现Writable接口
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造
(3)重写序列化方法,(注意序列化的字段的顺序必须与反序列化的顺序一致)
(4)重写反序列化方法
(5)注意反序列化的顺序和序列化的顺序完全一致
(6)要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。
(五)序列化案例实操
1、需求
统计每一个手机号全年的总话费(注意,虚拟网包月费属于赠送费,不计入在内)。
2、数据准备
3、分析
基本思路:
Map阶段:
a.读取一行数据,切分字段;
b.抽取手机号、套餐基本费、语音通信费、短信彩信费、流量费;
c.以手机号为key,bean对象为value输出,即context.write(手机号,bean)。
Reduce阶段:
a.累加套餐基本费、语音通信费、短信彩信费、流量费得到总花费;
b.实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输;
c. MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。
注意:重点是在mapper阶段以手机号为key,想想为什么?
4、编写mapreduce程序
(1)编写流量统计的bean对象
package com.bigdata.phonefee;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
// 如果要将自定义类的对象放到keyout,或者valueout的位置上,hadoop会将这些对象进行序列化和反序列化,
// 因此必要让我们自定义的类具备hadoop序列化的要求
// 1 实现Writable接口
public class Phone implements Writable {
private Long baseFee;
private Long voiceFee;
private Long msgFee;
private Long flowFee;
private Long sumFee;
// 2 准备空参构造
public Phone() {
}
public Phone(Long baseFee, Long voiceFee, Long msgFee, Long flowFee, Long sumFee) {
this.baseFee = baseFee;
this.voiceFee = voiceFee;
this.msgFee = msgFee;
this.flowFee = flowFee;
this.sumFee = sumFee;
}
// 设置参数的便利方法
public void setFee(Long baseFee, Long voiceFee, Long msgFee, Long flowFee) {
this.baseFee = baseFee;
this.voiceFee = voiceFee;
this.msgFee = msgFee;
this.flowFee = flowFee;
this.sumFee = baseFee + voiceFee + msgFee + flowFee;
}
public Long getBaseFee() {
return baseFee;
}
public void setBaseFee(Long baseFee) {
this.baseFee = baseFee;
}
public Long getVoiceFee() {
return voiceFee;
}
public void setVoiceFee(Long voiceFee) {
this.voiceFee = voiceFee;
}
public Long getMsgFee() {
return msgFee;
}
public void setMsgFee(Long msgFee) {
this.msgFee = msgFee;
}
public Long getFlowFee() {
return flowFee;
}
public void setFlowFee(Long flowFee) {
this.flowFee = flowFee;
}
public Long getSumFee() {
return sumFee;
}
public void setSumFee(Long sumFee) {
this.sumFee = sumFee;
}
// 3 准备序列化的方法,指定将哪些属性 进行序列化和反序列化
// 不是Write方法
public void write(DataOutput out) throws IOException {
out.writeLong(baseFee);
out.writeLong(voiceFee);
out.writeLong(msgFee);
out.writeLong(flowFee);
out.writeLong(sumFee);
}
// 4 准备反序列化的方法,注意:序列化和反序列化的属性的顺序要一致
public void readFields(DataInput in) throws IOException {
this.baseFee = in.readLong();
this.voiceFee = in.readLong();
this.msgFee = in.readLong();
this.flowFee = in.readLong();
this.sumFee = in.readLong();
}
// 5 toString
@Override
public String toString() {
return baseFee+"\t"+voiceFee+"\t"+msgFee+"\t"+flowFee+"\t"+sumFee;
}
}
(2)编写mapper
package com.bigdata.phonefee;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// 因为我们要在reduce对某个手机号的多条消费数据进行累加
//因此,我们就想让同个手机号的多条数据进入到同一个分组,进而调用一次reduce方法
// 进而才有机会对同个手机号的多条数据进行累加,手机号用Text表示
// 因为每条数据有多个消费情况,例如基本费,语音通信费,短信费,用IntWritable这样的数据类型,不足以封装
// 因此,创建一个Phone的实体类,用该类封装这些数据
public class PhonefeeMapper extends Mapper<LongWritable, Text, Text, Phone> {
Text k = new Text();
Phone v = new Phone();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 将每行数据转成 String , A 13939119984 3 5 7 8 20 201901
String line = value.toString();
// 2 按照tab分割 [A,13939119984,3,5,7,8,20,201901]
String[] split = line.split("\t");
// 3 挑出手机号,各项话费
String phoneNum = split[1];
String baseFee = split[2];
String voiceFee = split[3];
String msgFee = split[4];
String flowFee = split[5];
// 4 封装kv
k.set(phoneNum);
//v.setBaseFee(Long.parseLong(baseFee));
//v.setVoiceFee(Long.parseLong(voiceFee));
v.setFee(Long.parseLong(baseFee),Long.parseLong(voiceFee),Long.parseLong(msgFee),Long.parseLong(flowFee));
// 5 将kv写出
context.write(k,v);
}
}
(3)编写reducer
package com.bigdata.phonefee;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// 让手机号担任keyout
// 让手机号全年的话费情况当valueout
// <手机号,总话费Phone>
public class PhonefeeReduce extends Reducer<Text, Phone,Text,Phone> {
Phone v = new Phone();
@Override // 分组调用
protected void reduce(Text key, Iterable<Phone> values, Context context) throws IOException, InterruptedException {
//<13636744666, 5 10 5 20 18>
//<13636744666, 5 20 8 33 18>
//<13636744666, 6 10 8 33 18>
// 累加各项费用
long sumBaseFee = 0 ;
long sumVoiceFee = 0 ;
long sumMsgFee = 0 ;
long sumFlowFee = 0 ;
for (Phone value : values) {
Long baseFee = value.getBaseFee();
Long voiceFee = value.getVoiceFee();
Long msgFee = value.getMsgFee();
Long flowFee = value.getFlowFee();
sumBaseFee = sumBaseFee + baseFee;
sumVoiceFee = sumVoiceFee + voiceFee;
sumMsgFee = sumMsgFee + msgFee;
sumFlowFee = sumFlowFee + flowFee;
}
v.setFee(sumBaseFee,sumVoiceFee,sumMsgFee,sumFlowFee);
// 将kv写出
context.write(key,v);
}
}
(4)编写驱动
package com.bigdata.phonefee;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PhonefeeDriver {
public static void main(String[] args) throws Exception {
// 1 创建一个配置对象
Configuration conf = new Configuration();
// 2 通过配置对象创建一个job
Job job = Job.getInstance(conf);
// 3 设置job的mr的路径(jar包的位置)
job.setJarByClass(PhonefeeDriver.class);
// 4 设置job的mapper类 reduce类
job.setMapperClass(PhonefeeMapper.class);
job.setReducerClass(PhonefeeReduce.class);
// 5 设置job的mapper类的keyout,valueout
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Phone.class);
// 6 设置job的最终输出的keyout,valueout
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Phone.class);
// 7 设置job的输入数据的路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
// 8 设置job的输出数据的路径 得保证,输出目录不能事先存在,否则报错,
Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)){
fs.delete(outPath,true);
}
FileOutputFormat.setOutputPath(job,outPath);
// 9 提交job到yarn集群
boolean b = job.waitForCompletion(true);
System.out.println("是否运行成功:"+b);
}
}
|