1、序列化概述
1.1 需求
把服务器Hadoop102放在内存中的数据:ss和cls移动到 Hadoop103 上
1.2 实现方式
把内存中的ss和cls持久化到磁盘(字节码),拷贝到103,再加载到内存
1.3 概念
1.4 好处
解决了一个系统的内存对象,无法直接传到另一个系统的内存对象的问题
1.5 java序列化
- 疑问
- java中有自带的 Serializable 接口,为什么不使用呢
- 原因
- java的序列化比较重,包含 数据+各种校验信息/继承体系/头信息
- 对于hadoop来说,只需要 数据+简单校验
1.6 好处
结构紧凑、存储空间少、传输快
2、自定义bean对象实现序列化接口
2.1 问题
hadoop提供的10个常用序列化类型不能满足所有要求
2.2 步骤
a、实现writable接口 如果要对一个bean对象实现服务器之间的传输,需要实现序列化接口,就像之前WordCount中的k,v,都是基本序列化类型
b、反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() {
super();
}
c、重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
d、重写反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
e、注意反序列化的顺序和序列化的顺序完全一致
f、要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
- 如果不重写,默认打印的是地址值
- 打印地址里的数据需要使用toString()
g、如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口
- Map<key,val,key,val>,其中,第二个key需要实现Comparable接口,因为shuffle阶段会对key进行排序
- 详见后面排序案例。
3、序列化案例实操
3.1 需求分析
统计每一个手机号耗费的总上行流量、总下行流量、总流量 (提示:总流量 = 总上行流量+总下行流量)
输入数据
自增id | 手机号 | 网络IP | 域名 | 上行 | 下行 | 网络状态 |
---|
1 | 13736230513 | 192.196.100.1 | www.atguigu.com | 2481 | 24681 | 200 | 2 | 13846544121 | 192.196.100.2 | | 264 | 0 | 200 | 3 | 13956435636 | 192.196.100.3 | | 132 | 1512 | 200 | 4 | 13966251146 | 192.168.100.1 | | 240 | 0 | 404 | 5 | 18271575951 | 192.168.100.2 | www.atguigu.com | 1527 | 2106 | 200 | 6 | 84188413 | 192.168.100.3 | www.atguigu.com | 4116 | 1432 | 200 | 7 | 13590439668 | 192.168.100.4 | | 1116 | 954 | 200 | 8 | 15910133277 | 192.168.100.5 | www.hao123.com | 3156 | 2936 | 200 | 9 | 13729199489 | 192.168.100.6 | | 240 | 0 | 200 | 10 | 13630577991 | 192.168.100.7 | www.shouhu.com | 6960 | 690 | 200 | 11 | 15043685818 | 192.168.100.8 | www.baidu.com | 3659 | 3538 | 200 | 12 | 15959002129 | 192.168.100.9 | www.atguigu.com | 1938 | 180 | 500 | 13 | 13560439638 | 192.168.100.10 | | 918 | 4938 | 200 | 14 | 13470253144 | 192.168.100.11 | | 180 | 180 | 200 | 15 | 13682846555 | 192.168.100.12 | www.qq.com | 1938 | 2910 | 200 | 16 | 13992314666 | 192.168.100.13 | www.gaga.com | 3008 | 3720 | 200 | 17 | 13509468723 | 192.168.100.14 | www.qinghua.com | 7335 | 110349 | 404 | 18 | 18390173782 | 192.168.100.15 | www.sogou.com | 9531 | 2412 | 200 | 19 | 13975057813 | 192.168.100.16 | www.baidu.com | 11058 | 48243 | 200 | 20 | 13768778790 | 192.168.100.17 | | 120 | 120 | 200 | 21 | 13568436656 | 192.168.100.18 | www.alibaba.com | 2481 | 24681 | 200 | 22 | 13568436656 | 192.168.100.19 | | 1116 | 954 | 200 |
期望输出格式
手机号码 | 上行流量 | 下行流量 | 总流量 |
---|
13470253144 | 180 | 180 | 360 |
map的参数
<偏移量,一行数据,key,value>
- 思考:key 和 value 值是什么
- key:手机号
- value:JavaBean(FlowBean)
- JavaBean需要哪些数据
- 手机号
- 上行流量
- 下行流量
- 总流量后期计算得出
- 其他无关字段不取
Map阶段
- 读取一行数据,切分字段;
- 抽取手机号,上行流量,下行流量;
- 以手机号为key,bean对象为value输出,即context.write(手机号,bean)。
Reduce阶段
3.2 编写步骤
3.2.1 编写流量统计的bean对象
public class FlowBean implements Writable {
private long upFlow;
private long dowmFlow;
private long sumFlow;
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dowmFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
dowmFlow = in.readLong();
sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDowmFlow() {
return dowmFlow;
}
public void setDowmFlow(long dowmFlow) {
this.dowmFlow = dowmFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.dowmFlow + this.upFlow;
}
@Override
public String toString() {
return upFlow +"\t"+ dowmFlow +"\t" + sumFlow;
}
}
a、实现writable接口
b、反序列化时,需要反射调用空参构造函数,所以必须有空参构造
c、重写序列化方法
d、重写反序列化方法
e、注意反序列化的顺序和序列化的顺序完全一致
f、要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用。
- 如果不重写,默认打印的是地址值
- 打印地址里的数据需要使用toString()
g、如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口
- Map<key,val,key,val>,其中,第二个key需要实现Comparable接口,因为shuffle阶段会对key进行排序
3.2.2 编写Mapper类
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] word = line.split("\t");
String phone = word[1];
String upFlow = word[word.length - 3];
String downFlow = word[word.length - 2];
outK.set(phone);
outV.setUpFlow(Long.parseLong(upFlow));
outV.setDowmFlow(Long.parseLong(downFlow));
outV.setSumFlow();
context.write(outK,outV);
}
}
3.2.3 编写Reducer类
public class FlowReduce extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sumUp = 0;
long sumDown = 0;
for (FlowBean value : values) {
sumUp += value.getUpFlow();
sumDown += value.getDowmFlow();
}
outV.setUpFlow(sumUp);
outV.setDowmFlow(sumDown);
outV.setSumFlow();
context.write(key,outV);
}
}
3.2.4 编写Driver类
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowDriver.class);
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path("D:\\input\\inputflow"));
FileOutputFormat.setOutputPath(job,new Path("D:\\hadoop\\output12"));
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
3.3 最终输出
手机号码 | 上行流量 | 下行流量 | 总流量 |
---|
13470253144 | 180 | 180 | 360 | 13509468723 | 7335 | 110349 | 117684 | 13560439638 | 918 | 4938 | 5856 | 13568436656 | 3597 | 25635 | 29232 | 13590439668 | 1116 | 954 | 2070 | 13630577991 | 6960 | 690 | 7650 | 13682846555 | 1938 | 2910 | 4848 | 13729199489 | 240 | 0 | 240 | 13736230513 | 2481 | 24681 | 27162 | 13768778790 | 120 | 120 | 240 | 13846544121 | 264 | 0 | 264 | 13956435636 | 132 | 1512 | 1644 | 13966251146 | 240 | 0 | 240 | 13975057813 | 11058 | 48243 | 59301 | 13992314666 | 3008 | 3720 | 6728 | 15043685818 | 3659 | 3538 | 7197 | 15910133277 | 3156 | 2936 | 6092 | 15959002129 | 1938 | 180 | 2118 | 18271575951 | 1527 | 2106 | 3633 | 18390173782 | 9531 | 2412 | 11943 | 84188413 | 4116 | 1432 | 5548 |
|