一、MapReduce编程须知
map()方法是对输入的一个KV对?调用一次!!?
Reduce()方法是对相同K的一组KV对 调用执行一次?
1.1 Mapper类编程总结
- 用户自定义一个Mapper类继承Hadoop的Mapper类
- Mapper的输入数据是KV对的形式(类型可以自定义)
- Map阶段的业务逻辑定义在map()方法中
- Mapper的输出数据是KV对的形式(类型可以自定义)
1.2 reduce 类编程总结
- 用户自定义Reducer类要继承Hadoop的Reducer类
- Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
- Reducer的业务逻辑写在reduce()方法中
二、编程实战
2.1 WordCount统计功能
2.1.1 需求
在给定文本文件中统计输出每一个单词出现的总次数
wc.txt
1 2 3 4
5 6 5 6
2 4 7 9
输出结果
1 1
2 2
3 1
4 2
5 2
6 2
7 1
9 1
2.1.2 思路梳理?
Map阶段:
- 1. map()方法中把传入的数据转为String类型
- 2. 根据空格切分出单词
- 3. 输出<单词,1>
Reduce阶段:
- 1. 汇总各个key(单词)的个数,遍历value数据进行累加
- 2. 输出key的总数
2.1.3 编程
(1)map (输出数据类型:Text, IntWritable)
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final Text text = new Text();
private final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
String[] values = word.split(" ");
for (String s : values) {
text.set(s);
context.write(text,one);
}
}
}
(2)reduce(输出数据类型:Text, IntWritable)
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private final IntWritable reslut = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
sum += i;
}
reslut.set(sum);
context.write(key,reslut);
}
}
(3)driver
/**
* 统计源文件单词出现次数
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCountDriver");
job.setJarByClass(WordCountDriver.class);
//指定mappper
job.setMapperClass(WordCountMapper.class);
//指定reducer
job.setReducerClass(WordCountReducer.class);
//指定key
job.setMapOutputKeyClass(Text.class);
//指定value
job.setMapOutputValueClass(IntWritable.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(IntWritable.class);
//job.setNumReduceTasks(3);
//输入路径
FileInputFormat.setInputPaths(job,new Path("D:\\code\\wc\\input\\wc.txt"));
//输出路径
FileOutputFormat.setOutputPath(job,new Path("D:\\code\\wc\\output"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}
备注:如果driver中?job.setNumReduceTasks(3);
则输出文件会输出3份
??? ?
2.1.4 优化(加入Combiner:预聚合)
(1)map同上(不变)
(2)WordCountCombiner(数据类型:Text, IntWritable,Text,IntWritable)
/**
* mapTast预合并类
*/
public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
private final IntWritable reslut = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
sum += i;
}
reslut.set(sum);
context.write(key,reslut);
}
}
(3)reduce同上(不变)
(4)WordCountCombinerDriver
在上面的2.1.2的driver类中的‘输出value’ 后面添加
job.setCombinerClass(WordCountCombiner.class);
2.2 自定义对象分区(id为key,value输出自定义对象)
自定义对象输出:只要把分区CustomPartition、driver.setPartitionerClass去掉就行
2.2.1 需求
文件
01 a00df6s kar 120.196.100.99 384 33 200
11 0sfs01 kar 120.196.100.99 198 86 200
21 adfd00fd5 pandora 120.196.100.99 513 261 200
31 0ad0s7 pandora 120.196.100.99 840 413 200
41 0sfs01 kar 120.196.100.99 190 401 200
51 00fdaf3 kar 120.196.100.99 273 527 200
61 00fdaf3 pandora 120.196.100.99 527 273 200
71 0a0fe2 kar 120.196.100.99 730 496 200
81 0a0fe2 pandora 120.196.100.99 367 759 200
91 0sfs01 kar 120.196.100.99 529 484 200
101 adfd00fd5 kar 120.196.100.99 516 18 200
...
输出结果
00fdaf3 00fdaf3 32757 33167 65924
00wersa4 00wersa4 30689 35191 65880
0a0fe2 0a0fe2 43085 44254 87339
0ad0s7 0ad0s7 31702 29183 60885
0sfs01 0sfs01 31883 29101 60984
a00df6s a00df6s 33239 36882 70121
adfd00fd5 adfd00fd5 30727 31491 62218
2.2.2 编程
(1)PartitionBean?
public class PartitionBean implements Writable {
//id
private String id;
//设备id
private String deviceId;
//厂商
private String appKey;
//ip
private String ip;
//自有时长时间
private long selfDuration;
//第三方时间
private long thirdPartDuraton;
//状态
private String status;
public PartitionBean() {
super();
}
public PartitionBean(String id, String deviceId, String appKey, String ip, long selfDuration, long thirdPartDuraton, String status) {
this.id = id;
this.deviceId = deviceId;
this.appKey = appKey;
this.ip = ip;
this.selfDuration = selfDuration;
this.thirdPartDuraton = thirdPartDuraton;
this.status = status;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(deviceId);
dataOutput.writeUTF(appKey);
dataOutput.writeUTF(ip);
dataOutput.writeLong(selfDuration);
dataOutput.writeLong(thirdPartDuraton);
dataOutput.writeUTF(status);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.deviceId = dataInput.readUTF();
this.appKey = dataInput.readUTF();
this.ip = dataInput.readUTF();
this.selfDuration = dataInput.readLong();
this.thirdPartDuraton = dataInput.readLong();
this.status = dataInput.readUTF();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getAppKey() {
return appKey;
}
public void setAppKey(String appKey) {
this.appKey = appKey;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(long selfDuration) {
this.selfDuration = selfDuration;
}
public long getThirdPartDuraton() {
return thirdPartDuraton;
}
public void setThirdPartDuraton(long thirdPartDuraton) {
this.thirdPartDuraton = thirdPartDuraton;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public String toString() {
return "{" +id + "\t" + deviceId + "\t" + appKey + "\t" + ip + selfDuration + "\t" + thirdPartDuraton + "\t" + status + '}';
}
}
(1)map (输出数据类型:Text, PartitionBean)
public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
private final Text text = new Text();
private PartitionBean partitionBean = new PartitionBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
//按\t切割,读取设备id,自有时长和第三方时长
String[] arraySpeakProp = word.split("\t");
String id = arraySpeakProp[0];//id
String deviceId = arraySpeakProp[1];//设备id
String appKey = arraySpeakProp[2];//厂商
String ip = arraySpeakProp[3];//ip
String selfDuration = arraySpeakProp[4];//自有时长
String thirdPartduration = arraySpeakProp[5];//第三方时长
String status = arraySpeakProp[6];//状态
partitionBean.setId(id);
partitionBean.setDeviceId(deviceId);
partitionBean.setAppKey(appKey);
partitionBean.setIp(ip);
partitionBean.setSelfDuration(Long.parseLong(selfDuration));
partitionBean.setThirdPartDuraton(Long.parseLong(thirdPartduration));
partitionBean.setStatus(status);
text.set(appKey);
context.write(text, partitionBean);
}
}
(2)reduce (输出数据类型:Text, PartitionBean)
public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
@Override
protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
for (PartitionBean bean : values) {
context.write(key,bean);
}
}
}
(3)CustomPartition
/**
* 自定义分区
*/
public class CustomPartition extends Partitioner<Text, PartitionBean> {
@Override
public int getPartition(Text text, PartitionBean partitionBean, int i) {
if (text.toString().equals("kar")){
return 0;
}else if(text.toString().equals("pandora")){
return 1;
}else{
return 2;
}
}
}
(4)driver
/**
* 实现mapTast阶段的自定义分区CustomPartition
*/
public class PartitionDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "PartitionDriver");
job.setJarByClass(PartitionDriver.class);
//指定mappper
job.setMapperClass(PartitionMapper.class);
//指定reducer
job.setReducerClass(PartitionReducer.class);
//指定key
job.setMapOutputKeyClass(Text.class);
//指定value
job.setMapOutputValueClass(PartitionBean.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(PartitionBean.class);
//自定义分区
job.setPartitionerClass(CustomPartition.class);
job.setNumReduceTasks(3);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\speak\\input\\speak.data"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\speak\\out_partition"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.3自定义对象全排序(id为自定义对象,value输出NullWritable)
2.3.1 需求
文件
01 a00df6s kar 120.196.100.99 384 33 200
11 0sfs01 kar 120.196.100.99 198 86 200
21 adfd00fd5 pandora 120.196.100.99 513 261 200
31 0ad0s7 pandora 120.196.100.99 840 413 200
41 0sfs01 kar 120.196.100.99 190 401 200
51 00fdaf3 kar 120.196.100.99 273 527 200
61 00fdaf3 pandora 120.196.100.99 527 273 200
71 0a0fe2 kar 120.196.100.99 730 496 200
81 0a0fe2 pandora 120.196.100.99 367 759 200
91 0sfs01 kar 120.196.100.99 529 484 200
101 adfd00fd5 kar 120.196.100.99 516 18 200
输出结果
2.3.2 编程
(1)SpeakBeanComparable
/**
* 自定义beam实现comparable
*/
public class SpeakBeanComparable implements WritableComparable<SpeakBeanComparable> {
//自有时长时间
private long selfDuration;
//第三方时间
private long thirdPartDuraton;
//总时长
private long sumDuration;
//设备id
private String deviceId;
public SpeakBeanComparable() {
super();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(selfDuration);
dataOutput.writeLong(thirdPartDuraton);
dataOutput.writeLong(sumDuration);
dataOutput.writeUTF(deviceId);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.selfDuration = dataInput.readLong();
this.thirdPartDuraton = dataInput.readLong();
this.sumDuration = dataInput.readLong();
this.deviceId = dataInput.readUTF();
}
public long getSelfDuration() {
return selfDuration;
}
public void setSelfDuration(long selfDuration) {
this.selfDuration = selfDuration;
}
public long getThirdPartDuraton() {
return thirdPartDuraton;
}
public void setThirdPartDuraton(long thirdPartDuraton) {
this.thirdPartDuraton = thirdPartDuraton;
}
public Long getSumDuration() {
return sumDuration;
}
public void setSumDuration(Long sumDuration) {
this.sumDuration = sumDuration;
}
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
@Override
public String toString() {
return deviceId + "\t" + selfDuration + "\t" + thirdPartDuraton + "\t" + sumDuration ;
}
@Override
public int compareTo(SpeakBeanComparable o) {
if(this.getSumDuration()<o.getSumDuration()){
return -1;
}else if(this.getSumDuration()>o.getSumDuration()){
return 1;
}else{
return 0;
}
}
}
(2)map (输出数据类型:SpeakBeanComparable, NullWritable)
public class SpeakSortMapper extends Mapper<LongWritable, Text, SpeakBeanComparable, NullWritable> {
final SpeakBeanComparable speakBean = new SpeakBeanComparable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
//按\t切割,读取设备id,自有时长和第三方时长
String[] arraySpeakProp = word.split("\t");
String deviceId = arraySpeakProp[1];//设备id
String selfDuration = arraySpeakProp[arraySpeakProp.length - 3];//自有时长
String thirdPartduration = arraySpeakProp[arraySpeakProp.length - 2];//第三方时长
String sumDuration = arraySpeakProp[arraySpeakProp.length - 1];//总时长
speakBean.setDeviceId(deviceId);
speakBean.setSelfDuration(Long.parseLong(selfDuration));
speakBean.setThirdPartDuraton(Long.parseLong(thirdPartduration));
speakBean.setSumDuration(Long.parseLong(sumDuration));
context.write(speakBean, NullWritable.get());
}
}
(3)reduce (输出数据类型:SpeakBeanComparable, NullWritable)
public class SpeakSortReducer extends Reducer<SpeakBeanComparable,NullWritable, SpeakBeanComparable,NullWritable> {
private SpeakBeanComparable result = new SpeakBeanComparable();
@Override
protected void reduce(SpeakBeanComparable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
for (NullWritable value : values) {
context.write(key,value);
}
}
}
(4)driver
/**
* 业务场景:源文件内容按某字段进行排序输出
* 实现:自定义bean对象作为key 实现comparable进行指定字段排序
*/
public class SpeakSortDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SpeakSortDriver");
job.setJarByClass(SpeakSortDriver.class);
//指定mappper
job.setMapperClass(SpeakSortMapper.class);
//指定reducer
job.setReducerClass(SpeakSortReducer.class);
//指定key
job.setMapOutputKeyClass(SpeakBeanComparable.class);
//指定value
job.setMapOutputValueClass(NullWritable.class);
//指定输出key
job.setOutputKeyClass(SpeakBeanComparable.class);
//指定输出value
job.setOutputValueClass(NullWritable.class);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\speak\\out_bean"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\speak\\out_sort"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
备注:
- 对于全局排序需要保证只有一个reduceTask!!
- 默认reduceTask数量是1个;
2.4分组输出(继承WritableComparator,重写compare方法
,根据条件判断为同一对象进行输出)
2.4.1 需求
文件
订单id 商品id 价格
0000001 ???Pdt_01 222.8
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
输出结果
求出每一个订单中成交金额最大的一笔交易。
0000001 222.8
0000002 722.4
0000003 232.8
2.4.2 编程
(1)对象
/**
* 自定义beam实现comparable,先按订单id排序,然后按价格倒序
*/
public class OrderBeanCompare implements WritableComparable<OrderBeanCompare> {
//价格
private Double price;
//订单id
private String orderId;
public OrderBeanCompare() {
super();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(orderId);
dataOutput.writeDouble(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderId = dataInput.readUTF();
this.price = dataInput.readDouble();
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
@Override
public String toString() {
return orderId + "\t" + price;
}
@Override
public int compareTo(OrderBeanCompare o) {
int res = this.orderId.compareTo(o.orderId);
if (res == 0) {
res = -this.price.compareTo(o.price);
}
return res;
}
}
(2)map (输出数据类型:OrderBeanCompare?, NullWritable)
public class OrderMapper extends Mapper<LongWritable, Text, OrderBeanCompare, NullWritable> {
final OrderBeanCompare orderBean = new OrderBeanCompare();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
String[] arry = word.split("\t");
String orderId = arry[0];//设备id
String price = arry[2];//价钱
orderBean.setOrderId(orderId);
orderBean.setPrice(Double.parseDouble(price));
context.write(orderBean, NullWritable.get());
}
}
(3)自定义分区
/**
* 根据订单id进行分区
*/
public class OrderPartition extends Partitioner<OrderBeanCompare, NullWritable> {
@Override
public int getPartition(OrderBeanCompare orderBean, NullWritable nullWritable, int numReduceTasks) {
return (orderBean.getOrderId().hashCode() & 2147483647) % numReduceTasks;
}
}
(4)自定义分组
/**
* 根据订单id一样,就判断为同一对象
*/
public class OrderGroupCompareTor extends WritableComparator {
public OrderGroupCompareTor() {
super(OrderBeanCompare.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBeanCompare order1 = (OrderBeanCompare) a;
OrderBeanCompare order2 = (OrderBeanCompare) b;
return order1.getOrderId().compareTo(order2.getOrderId());
}
}
(5)reduce (输出数据类型:OrderBeanCompare?, NullWritable)
public class OrderReducer extends Reducer<OrderBeanCompare, NullWritable, OrderBeanCompare, NullWritable> {
@Override
protected void reduce(OrderBeanCompare key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
(6)driver
/**
* 业务场景:挑选出订单里最高金额的记录
* 实现:自定义bean对象作为key 实现comparable进行指定字段排序
* 先按订单id排序,然后按价格倒序
*/
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "OrderDriver");
job.setJarByClass(OrderDriver.class);
//指定mappper
job.setMapperClass(OrderMapper.class);
//指定reducer
job.setReducerClass(OrderReducer.class);
//指定key
job.setMapOutputKeyClass(OrderBeanCompare.class);
//指定value
job.setMapOutputValueClass(NullWritable.class);
//指定输出key
job.setOutputKeyClass(OrderBeanCompare.class);
//指定输出value
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(OrderPartition.class);
job.setGroupingComparatorClass(OrderGroupCompareTor.class);
//job.setNumReduceTasks(3);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\groupcomparetor\\groupingComparator.txt"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\groupcomparetor\\order_out"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.5Join实战(小表join大表)
2.5.1 需求
文件
177725422 产品经理
177725433 大数据开发工程师
1001 177725422 2020-01-03
1002 177725422 2020-01-04
1002 177725433 2020-01-03
输出结果
1001 177725422 2020-01-03 产品经理
1002 177725422 2020-01-04 产品经理
1002 177725433 2020-01-03 大数据开发工程师
2.5.2 编程
(1)map (输出数据类型:Text, NullWritable)
/**
* 在mapTast阶段小表进行缓存
*/
public class MapperJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
final HashMap<String, String> map= new HashMap<>();;
Text k =new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(new File("D:/code/map_join/cache/position.txt")),"UTF-8");
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line ;
while (StringUtils.isNotEmpty(line=bufferedReader.readLine())){
String[] split = line.split("\t");
map.put(split[0],split[1]);
}
bufferedReader.lines();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
String positionId = split[1];
k.set(value.toString()+"\t"+map.get(positionId));
context.write(k,NullWritable.get());
}
}
(2)driver
/**
* 大表连接小表
*/
public class MapperJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "MapperJoinDriver");
job.setJarByClass(MapperJoinDriver.class);
//指定mappper
job.setMapperClass(MapperJoinMapper.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:/code/map_join/input/deliver_info.txt"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:/code/map_join/out"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.6CombineTextInputFormat数据输入(合并小文件wordcount)
MR框架默认的TextInputFormat切片机制按文件划分切片,
文件无论多小,都是单独一个切片, 然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个 MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用 率不高。
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切 片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。
具体使用方式
// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304)
使用示例
假设设置setMaxInputSplitSize值为4M
四个小文件:1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M
根据是否大于4,如果大于,对半切
2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。
然后依次加以来,使大于4M就形成一个文件
最终会形成3个切片,大小分别为: (2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M
2.6.1 需求
文件
合并wordcount小文件
输出结果
2.6.2 编程
(1)map (输出数据类型:Text, IntWritable)
public class TextInputformatMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final Text text = new Text();
private final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
String[] values = word.split("\t");
text.set(values[0]);
context.write(text,one);
}
}
(2)reduce (输出数据类型:Text, IntWritable)
public class TextInputformatReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private final IntWritable reslut = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
sum += i;
}
reslut.set(sum);
context.write(key,reslut);
}
}
(3)driver
/**
* 合并小文件 CombineTextInputFormat
*/
public class TextInputformatDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TextInputformatDriver");
job.setJarByClass(TextInputformatDriver.class);
//指定mappper
job.setMapperClass(TextInputformatMapper.class);
//指定reducer
job.setReducerClass(TextInputformatReducer.class);
//指定key
job.setMapOutputKeyClass(Text.class);
//指定value
job.setMapOutputValueClass(IntWritable.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(IntWritable.class);
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\text\\input"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\text\\output"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.7自定义InputFormat(合并小文件)
HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景, 此时,就需要有相应解决方案。
可以自定义InputFormat实现小文件的合并。?
2.7.1 需求
将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的 key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为value
文件
输出结果
得到一个合并了多个小文件的SequenceFile文
思路
1. 定义一个类继承FileInputFormat 2. 重写isSplitable()指定为不可切分;重写createRecordReader()方法,创建自己的 RecorderReader对象 3. 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出; 4. Driver指定使用的InputFormat类型
2.7.2 编程
(1)自定义fileInputFormat(输出数据类型:Text, BytesWritable)
/**
* 自定义输入文件,合并小文件成二进制文件
*/
public class CustomFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
@Override
public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
CustomRecordReader customRecordReader = new CustomRecordReader();
customRecordReader.initialize(inputSplit,context);
return customRecordReader;
}
}
(2)自定义Reader(输出数据类型:Text, BytesWritable)
public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
private Boolean isProcess = true;
private FileSplit fileSplit;
private Configuration configuration;
private Text key = new Text();
private BytesWritable fileValue = new BytesWritable();
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) inputSplit;
this.configuration = context.getConfiguration();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (isProcess) {
//文件路径
Path path = fileSplit.getPath();
//文件大小
long length = fileSplit.getLength();
//读取文件
FileSystem fs = path.getFileSystem(configuration);
FSDataInputStream input = fs.open(path);
byte[] contents = new byte[(int) length];
//文件存入contents
IOUtils.readFully(input, contents, 0, contents.length);
//写出value contents
fileValue.set(contents,0,contents.length);
//设置key
key.set(path.toString());
IOUtils.closeStream(input);
isProcess = false;//一个文件就读一次,所以这里设置为false
return true;
}
return false;
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return fileValue;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
}
}
(3)map (输出数据类型:Text, BytesWritable)
public class CustomInputFormatMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
(4)reduce (输出数据类型:Text, BytesWritable)
public class CustomInputFormatReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,values.iterator().next());
}
}
(5)driver
/**
* 合并小文件成二进制文件(缺点:有多少小文件就有多少mapTask任务)
*/
public class CustomInputFormatDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CustomInputFormatDriver");
job.setJarByClass(CustomInputFormatDriver.class);
//指定mappper
job.setMapperClass(CustomInputFormatMapper.class);
//指定reducer
job.setReducerClass(CustomInputFormatReducer.class);
//指定key
job.setMapOutputKeyClass(Text.class);
//指定value
job.setMapOutputValueClass(BytesWritable.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(CustomFileInputFormat.class);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\byte\\input"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\byte\\custom_out"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
打断点结论:每个文件走一次?
CustomFileInputFormat ——>CustomRecordReader?——>map——>reduce
2.8自定义outputFormat
2.8.1 需求
文件
输出结果
2.8.2 编程
(1)map (输出数据类型:Text, NullWritable)
public class CustomOutputFormatMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}
(2)自定义CustomFileOutputFormat?(输出数据类型:Text, NullWritable)
/**
* 自定义输出文件
*/
public class CustomFileOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataOutputStream out= fs.create(new Path("D:\\code\\outputformat\\out\\baidu.txt"));
FSDataOutputStream other = fs.create(new Path("D:\\code\\outputformat\\out\\other.txt"));
CustomRecordWriter customRecordWriter = new CustomRecordWriter(out,other);
return customRecordWriter;
}
}
(3)reduce (输出数据类型:Text, NullWritable)
public class CustomOutputFormatReducer extends Reducer<Text, NullWritable,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
(4)CustomRecordWriter?(输出数据类型:Text, NullWritable)
public class CustomRecordWriter extends RecordWriter<Text, NullWritable> {
private FSDataOutputStream out;
private FSDataOutputStream otherOut;
public CustomRecordWriter(FSDataOutputStream out, FSDataOutputStream other) {
this.out= out;
this.otherOut = other;
}
@Override
public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
String key = text.toString();
if (key.contains("baidu")) {
out.write(key.getBytes());
out.write("\r\n".getBytes());
} else {
otherOut.write(key.getBytes());
otherOut.write("\r\n".getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(out);
IOUtils.closeStream(otherOut);
}
}
(3)driver
/**
* 输出文件进行内容划分
*/
public class CustomOutputFormatDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CustomInputFormatDriver");
job.setJarByClass(CustomOutputFormatDriver.class);
//指定mappper
job.setMapperClass(CustomOutputFormatMapper.class);
//指定reducer
job.setReducerClass(CustomOutputFormatReducer.class);
//指定key
job.setMapOutputKeyClass(Text.class);
//指定value
job.setMapOutputValueClass(NullWritable.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(CustomFileOutputFormat.class);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\outputformat\\input"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\outputformat\\custom_out"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
打断点结论:
一个文件一个mapTask,map每行调一次,但FileOut整个过程只调了一次,然后reduce每行(去重后的每行)调一次,紧接着调writer(去重后的每行调一次)
2.9 压缩文件
2.9.1?需求
把文件进行压缩
文件
输出结果
2.9.2 编程
(1)driver
public class SequenceFileOutputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SequenceFileOutputDriver");
//尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间
//针对Sequencefile的压缩
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
//压缩类型:record压缩
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\text\\input"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\text\\output"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
(2)读取压缩文件
map
public class TextInputformatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
private final Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
text.set(value.toString());
context.write(text,NullWritable.get());
}
}
reduce
public class TextInputformatReducer extends Reducer<Text, IntWritable,Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
driver
/**
* 查看压缩文件输出
*/
public class TextInputformatDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "TextInputformatDriver");
job.setJarByClass(TextInputformatDriver.class);
//指定mappper
job.setMapperClass(TextInputformatMapper.class);
//指定reducer
job.setReducerClass(TextInputformatReducer.class);
//指定key
job.setMapOutputKeyClass(Text.class);
//指定value
job.setMapOutputValueClass(NullWritable.class);
//指定输出key
job.setOutputKeyClass(Text.class);
//指定输出value
job.setOutputValueClass(NullWritable.class);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\text\\output"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\text\\output2"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
2.10 合并小文件,自定义输入,自定义输出文件名称
2.10.1 需求
现在有一些订单的评论数据,需求,将订单按照好评与差评区分开来,将数据输出到不同的文件目录 下,数据内容如下,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评。 现需要根据好评,中评,差评把数据分类并输出到不同的目录中,并且要求按照时间顺序降序排列。
文件
1 我想再来一个 \N 1 3 hello 来就来吧 0 2018-03-14 22:29:03
2 好的 \N 1 1 添加一个吧 说走咱就走啊 0 2018-03-14 22:42:04
3 haobuhao \N 1 1 nihao 0 2018-03-24 22:55:17
4 店家很好 非常好 \N 1 3 666 谢谢 0 2018-03-23 11:15:20
5 a'da'd \N 0 4 da'd's 打打操作 0 2018-03-22 14:52:42
6 达到 \N 1 4 1313 13132131 0 2018-03-07 14:30:38
7 321313 \N 1 4 111 1231231 1 2018-03-06 14:54:24
8 1 \N 1 4 1 110000 0 2018-03-23 14:54:42
9 666666666666666 \N 1 4 4444 7777777777777777 0 2018-03-01 10:58:24
10 00000000 \N 1 4 55555555555 009999999 1 2018-03-02 10:58:47
11 311111111111 \N 0 4 999999 1333333333 2 2018-03-03 10:59:10
输出结果
思路分析
现在有大量类似上面的小文件!
step1 :压缩文件
自定义分区根据评论等级把数据分区
自定义OutputFormat把数据输出到多个目录
2.10.2 编程
(1)压缩文件
/**
* 压缩文件
*/
public class SequenceFileOutputDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SequenceFileOutputDriver");
//尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间
//针对Sequencefile的压缩
SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
//压缩类型:record压缩
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
//输入路径
FileInputFormat.setInputPaths(job, new Path("D:\\code\\out_comment\\combine_text_input"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\out_comment\\seq_input"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
(2)自定义对象
public class MyCommentBean implements WritableComparable<MyCommentBean> {
private String orderId;
private String comment;
private String commentExt;
private int goodsNum;
private String phoneNum;
private String userName;
private String address;
private int commentStatus;
private String commentTime;
@Override
public String toString() {
return orderId+"\t"+comment+"\t"+commentExt+"\t"+goodsNum+"\t"+phoneNum+"\t"+userName+"\t"+address+"\t"+commentStatus+"\t"+commentTime;
}
//无参构造
public MyCommentBean() {
}
public MyCommentBean(String orderId, String comment, String commentExt, int goodsNum, String phoneNum, String userName, String address, int commentStatus, String commentTime) {
this.orderId = orderId;
this.comment = comment;
this.commentExt = commentExt;
this.goodsNum = goodsNum;
this.phoneNum = phoneNum;
this.userName = userName;
this.address = address;
this.commentStatus = commentStatus;
this.commentTime = commentTime;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getComment() {
return comment;
}
public void setComment(String comment) {
this.comment = comment;
}
public String getCommentExt() {
return commentExt;
}
public void setCommentExt(String commentExt) {
this.commentExt = commentExt;
}
public int getGoodsNum() {
return goodsNum;
}
public void setGoodsNum(int goodsNum) {
this.goodsNum = goodsNum;
}
public String getPhoneNum() {
return phoneNum;
}
public void setPhoneNum(String phoneNum) {
this.phoneNum = phoneNum;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public int getCommentStatus() {
return commentStatus;
}
public void setCommentStatus(int commentStatus) {
this.commentStatus = commentStatus;
}
public String getCommentTime() {
return commentTime;
}
public void setCommentTime(String commentTime) {
this.commentTime = commentTime;
}
//定义排序规则,按照时间降序;0,1,-1
@Override
public int compareTo(MyCommentBean o) {
return o.getCommentTime().compareTo(this.commentTime);
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeUTF(comment);
out.writeUTF(commentExt);
out.writeInt(goodsNum);
out.writeUTF(phoneNum);
out.writeUTF(userName);
out.writeUTF(address);
out.writeInt(commentStatus);
out.writeUTF(commentTime);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.comment = in.readUTF();
this.commentExt = in.readUTF();
this.goodsNum = in.readInt();
this.phoneNum = in.readUTF();
this.userName = in.readUTF();
this.address = in.readUTF();
this.commentStatus = in.readInt();
this.commentTime = in.readUTF();
}
}
(3)map (输出数据类型:MyCommentBean, NullWritable)
public class SeqMapper extends Mapper<LongWritable, Text, MyCommentBean, NullWritable> {
MyCommentBean commentBean = new MyCommentBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
String[] lineValue = word.split("\t");
if (lineValue.length >= 10) {
commentBean.setOrderId(lineValue[1]);
commentBean.setComment(lineValue[2]);
commentBean.setCommentExt(lineValue[3]);
commentBean.setGoodsNum(Integer.parseInt(lineValue[4]));
commentBean.setPhoneNum(lineValue[5]);
commentBean.setUserName(lineValue[6]);
commentBean.setAddress(lineValue[7]);
commentBean.setCommentStatus(Integer.parseInt(lineValue[8]));
commentBean.setCommentTime(lineValue[9]);
}
context.write(commentBean,NullWritable.get());
}
}
(4)自定义分区
/**
* 自定义分区
*/
public class MyPartition extends Partitioner<MyCommentBean, NullWritable> {
@Override
public int getPartition(MyCommentBean commentBean, NullWritable nullWritable, int i) {
return commentBean.getCommentStatus();
}
}
(5)自定义输出文件
/**
* 自定义输出文件
*/
public class MyFileOutputFormat extends FileOutputFormat<MyCommentBean, NullWritable> {
FSDataOutputStream good;
FSDataOutputStream common;
FSDataOutputStream bad;
@Override
public RecordWriter<MyCommentBean, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
int id = context.getTaskAttemptID().getTaskID().getId();
if (id == 0) {
good = fs.create(new Path("D:\\code\\out_comment\\seq_out\\good\\good.txt"));
} else if (id == 1) {
common = fs.create(new Path("D:\\code\\out_comment\\seq_out\\common\\common.txt"));
} else {
bad = fs.create(new Path("D:\\code\\out_comment\\seq_out\\bad\\bad.txt"));
}
MyRecordWriter myRecordWriter = new MyRecordWriter(good, common, bad);
return myRecordWriter;
}
}
(6)reduce (输出数据类型:MyCommentBean, NullWritable)
public class SeqReducer extends Reducer<MyCommentBean, NullWritable,MyCommentBean,NullWritable> {
@Override
protected void reduce(MyCommentBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
?(7)自定义写出类
public class MyRecordWriter extends RecordWriter<MyCommentBean, NullWritable> {
private FSDataOutputStream good;
private FSDataOutputStream common;
private FSDataOutputStream bad;
public MyRecordWriter(FSDataOutputStream good, FSDataOutputStream common, FSDataOutputStream bad) {
this.good = good;
this.common = common;
this.bad = bad;
}
@Override
public void write(MyCommentBean commentBean, NullWritable nullWritable) throws IOException, InterruptedException {
int commentStatus = commentBean.getCommentStatus();
if (commentStatus==0) {
good.write(commentBean.toString().getBytes());
good.write("\r\n".getBytes());
} else if(commentStatus==1) {
common.write(commentBean.toString().getBytes());
common.write("\r\n".getBytes());
}else{
bad.write(commentBean.toString().getBytes());
bad.write("\r\n".getBytes());
}
}
@Override
public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
IOUtils.closeStream(good);
IOUtils.closeStream(common);
IOUtils.closeStream(bad);
}
}
(8)driver
/**
* 对压缩文件进行统计输出
*/
public class SeqDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取job实例
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "SeqDriver");
job.setJarByClass(SeqDriver.class);
//指定mappper
job.setMapperClass(SeqMapper.class);
//指定reducer
job.setReducerClass(SeqReducer.class);
job.setMapOutputKeyClass(MyCommentBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(MyCommentBean.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(MyPartition.class);
//指定输出outputformat类型
job.setOutputFormatClass(MyFileOutputFormat.class);
job.setNumReduceTasks(3);
//输入路径:seq_input,custom_input
FileInputFormat.setInputPaths(job, new Path("D:\\code\\out_comment\\seq_input"));
//输出路径
FileOutputFormat.setOutputPath(job, new Path("D:\\code\\out_comment\\seq_out"));
//提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
|