IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> MapReduce(2)——编程实战 -> 正文阅读

[大数据]MapReduce(2)——编程实战

一、MapReduce编程须知

map()方法是对输入的一个KV对?调用一次!!?

Reduce()方法是对相同K的一组KV对 调用执行一次?

1.1 Mapper类编程总结

  • 用户自定义一个Mapper类继承Hadoop的Mapper类
  • Mapper的输入数据是KV对的形式(类型可以自定义)
  • Map阶段的业务逻辑定义在map()方法中
  • Mapper的输出数据是KV对的形式(类型可以自定义)

1.2 reduce 类编程总结

  • 用户自定义Reducer类要继承Hadoop的Reducer类
  • Reducer的输入数据类型对应Mapper的输出数据类型(KV对)
  • Reducer的业务逻辑写在reduce()方法中

二、编程实战

2.1 WordCount统计功能

2.1.1 需求

在给定文本文件中统计输出每一个单词出现的总次数

wc.txt

1 2 3 4
5 6 5 6
2 4 7 9

输出结果

1  1
2  2
3  1
4  2
5  2
6  2
7  1
9  1

2.1.2 思路梳理?

Map阶段:

  • 1. map()方法中把传入的数据转为String类型
  • 2. 根据空格切分出单词
  • 3. 输出<单词,1>

Reduce阶段:

  • 1. 汇总各个key(单词)的个数,遍历value数据进行累加
  • 2. 输出key的总数

2.1.3 编程

(1)map (输出数据类型:Text, IntWritable)

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final Text text = new Text();
    private final IntWritable one = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String word = value.toString();
        String[] values = word.split(" ");
        for (String s : values) {
            text.set(s);
            context.write(text,one);
        }
    }
}

(2)reduce(输出数据类型:Text, IntWritable)

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private final IntWritable reslut = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            int i = value.get();
            sum += i;
        }
        reslut.set(sum);
        context.write(key,reslut);
    }
}

(3)driver

/**
 * 统计源文件单词出现次数
 */
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "WordCountDriver");
        job.setJarByClass(WordCountDriver.class);
        //指定mappper
        job.setMapperClass(WordCountMapper.class);
        //指定reducer
        job.setReducerClass(WordCountReducer.class);
        //指定key
        job.setMapOutputKeyClass(Text.class);
        //指定value
        job.setMapOutputValueClass(IntWritable.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(IntWritable.class);

        //job.setNumReduceTasks(3);
        //输入路径
        FileInputFormat.setInputPaths(job,new Path("D:\\code\\wc\\input\\wc.txt"));
        //输出路径
        FileOutputFormat.setOutputPath(job,new Path("D:\\code\\wc\\output"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

备注:如果driver中?job.setNumReduceTasks(3);

则输出文件会输出3份

??? ?

2.1.4 优化(加入Combiner:预聚合)

(1)map同上(不变)

(2)WordCountCombiner(数据类型:Text, IntWritable,Text,IntWritable)

/**
 * mapTast预合并类
 */
public class WordCountCombiner extends Reducer<Text, IntWritable,Text,IntWritable> {
    private final IntWritable reslut = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            int i = value.get();
            sum += i;
        }
        reslut.set(sum);
        context.write(key,reslut);
    }
}

(3)reduce同上(不变)

(4)WordCountCombinerDriver

在上面的2.1.2的driver类中的‘输出value’ 后面添加 

job.setCombinerClass(WordCountCombiner.class);

2.2 自定义对象分区(id为key,value输出自定义对象)

自定义对象输出:只要把分区CustomPartition、driver.setPartitionerClass去掉就行

2.2.1 需求

文件

01  a00df6s    kar    120.196.100.99 384    33 200
11 0sfs01 kar    120.196.100.99 198    86 200
21 adfd00fd5  pandora    120.196.100.99 513    261    200
31 0ad0s7 pandora    120.196.100.99 840    413    200
41 0sfs01 kar    120.196.100.99 190    401    200
51 00fdaf3    kar    120.196.100.99 273    527    200
61 00fdaf3    pandora    120.196.100.99 527    273    200
71 0a0fe2 kar    120.196.100.99 730    496    200
81 0a0fe2 pandora    120.196.100.99 367    759    200
91 0sfs01 kar    120.196.100.99 529    484    200
101    adfd00fd5  kar    120.196.100.99 516    18 200

...

输出结果

00fdaf3 00fdaf3    32757  33167  65924
00wersa4   00wersa4   30689  35191  65880
0a0fe2 0a0fe2 43085  44254  87339
0ad0s7 0ad0s7 31702  29183  60885
0sfs01 0sfs01 31883  29101  60984
a00df6s    a00df6s    33239  36882  70121
adfd00fd5  adfd00fd5  30727  31491  62218

2.2.2 编程

(1)PartitionBean?

public class PartitionBean implements Writable {
    //id
    private String id;
    //设备id
    private String deviceId;
    //厂商
    private String appKey;
    //ip
    private String ip;
    //自有时长时间
    private long selfDuration;
    //第三方时间
    private long thirdPartDuraton;
    //状态
    private String status;


    public PartitionBean() {
        super();
    }

    public PartitionBean(String id, String deviceId, String appKey, String ip, long selfDuration, long thirdPartDuraton, String status) {
        this.id = id;
        this.deviceId = deviceId;
        this.appKey = appKey;
        this.ip = ip;
        this.selfDuration = selfDuration;
        this.thirdPartDuraton = thirdPartDuraton;
        this.status = status;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(deviceId);
        dataOutput.writeUTF(appKey);
        dataOutput.writeUTF(ip);
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuraton);
        dataOutput.writeUTF(status);

    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readUTF();
        this.deviceId = dataInput.readUTF();
        this.appKey = dataInput.readUTF();
        this.ip = dataInput.readUTF();
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuraton = dataInput.readLong();
        this.status = dataInput.readUTF();
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    public String getAppKey() {
        return appKey;
    }

    public void setAppKey(String appKey) {
        this.appKey = appKey;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public long getSelfDuration() {
        return selfDuration;
    }

    public void setSelfDuration(long selfDuration) {
        this.selfDuration = selfDuration;
    }

    public long getThirdPartDuraton() {
        return thirdPartDuraton;
    }

    public void setThirdPartDuraton(long thirdPartDuraton) {
        this.thirdPartDuraton = thirdPartDuraton;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "{" +id + "\t" + deviceId + "\t" + appKey + "\t" +  ip +  selfDuration + "\t" + thirdPartDuraton + "\t" + status + '}';
    }
  }

(1)map (输出数据类型:Text, PartitionBean)

public class PartitionMapper extends Mapper<LongWritable, Text, Text, PartitionBean> {
    private final Text text = new Text();
    private PartitionBean partitionBean = new PartitionBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String word = value.toString();
        //按\t切割,读取设备id,自有时长和第三方时长
        String[] arraySpeakProp = word.split("\t");
        String id = arraySpeakProp[0];//id
        String deviceId = arraySpeakProp[1];//设备id
        String appKey = arraySpeakProp[2];//厂商
        String ip = arraySpeakProp[3];//ip
        String selfDuration = arraySpeakProp[4];//自有时长
        String thirdPartduration = arraySpeakProp[5];//第三方时长
        String status = arraySpeakProp[6];//状态

        partitionBean.setId(id);
        partitionBean.setDeviceId(deviceId);
        partitionBean.setAppKey(appKey);
        partitionBean.setIp(ip);
        partitionBean.setSelfDuration(Long.parseLong(selfDuration));
        partitionBean.setThirdPartDuraton(Long.parseLong(thirdPartduration));
        partitionBean.setStatus(status);
        text.set(appKey);
        context.write(text, partitionBean);
    }
}

(2)reduce (输出数据类型:Text, PartitionBean)


public class PartitionReducer extends Reducer<Text, PartitionBean, Text, PartitionBean> {
    @Override
    protected void reduce(Text key, Iterable<PartitionBean> values, Context context) throws IOException, InterruptedException {
        for (PartitionBean bean : values) {
            context.write(key,bean);
        }
    }
}

(3)CustomPartition

/**
 * 自定义分区
 */
public class CustomPartition extends Partitioner<Text, PartitionBean> {

    @Override
    public int getPartition(Text text, PartitionBean partitionBean, int i) {
        if (text.toString().equals("kar")){
            return 0;
        }else if(text.toString().equals("pandora")){
            return 1;
        }else{
            return 2;
        }
    }
}

(4)driver

/**
 * 实现mapTast阶段的自定义分区CustomPartition
 */
public class PartitionDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "PartitionDriver");
        job.setJarByClass(PartitionDriver.class);
        //指定mappper
        job.setMapperClass(PartitionMapper.class);
        //指定reducer
        job.setReducerClass(PartitionReducer.class);
        //指定key
        job.setMapOutputKeyClass(Text.class);
        //指定value
        job.setMapOutputValueClass(PartitionBean.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(PartitionBean.class);
        //自定义分区
        job.setPartitionerClass(CustomPartition.class);
        job.setNumReduceTasks(3);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\speak\\input\\speak.data"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\speak\\out_partition"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

2.3自定义对象全排序(id为自定义对象,value输出NullWritable)

2.3.1 需求

文件

01  a00df6s    kar    120.196.100.99 384    33 200
11 0sfs01 kar    120.196.100.99 198    86 200
21 adfd00fd5  pandora    120.196.100.99 513    261    200
31 0ad0s7 pandora    120.196.100.99 840    413    200
41 0sfs01 kar    120.196.100.99 190    401    200
51 00fdaf3    kar    120.196.100.99 273    527    200
61 00fdaf3    pandora    120.196.100.99 527    273    200
71 0a0fe2 kar    120.196.100.99 730    496    200
81 0a0fe2 pandora    120.196.100.99 367    759    200
91 0sfs01 kar    120.196.100.99 529    484    200
101    adfd00fd5  kar    120.196.100.99 516    18 200

输出结果

2.3.2 编程

(1)SpeakBeanComparable

/**
 * 自定义beam实现comparable
 */
public class SpeakBeanComparable implements WritableComparable<SpeakBeanComparable> {
    //自有时长时间
    private long selfDuration;
    //第三方时间
    private long thirdPartDuraton;
    //总时长
    private long sumDuration;
    //设备id
    private String deviceId;

    public SpeakBeanComparable() {
        super();
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(selfDuration);
        dataOutput.writeLong(thirdPartDuraton);
        dataOutput.writeLong(sumDuration);
        dataOutput.writeUTF(deviceId);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.selfDuration = dataInput.readLong();
        this.thirdPartDuraton = dataInput.readLong();
        this.sumDuration = dataInput.readLong();
        this.deviceId = dataInput.readUTF();
    }

    public long getSelfDuration() {
        return selfDuration;
    }

    public void setSelfDuration(long selfDuration) {
        this.selfDuration = selfDuration;
    }

    public long getThirdPartDuraton() {
        return thirdPartDuraton;
    }

    public void setThirdPartDuraton(long thirdPartDuraton) {
        this.thirdPartDuraton = thirdPartDuraton;
    }

    public Long getSumDuration() {
        return sumDuration;
    }

    public void setSumDuration(Long sumDuration) {
        this.sumDuration = sumDuration;
    }

    public String getDeviceId() {
        return deviceId;
    }

    public void setDeviceId(String deviceId) {
        this.deviceId = deviceId;
    }

    @Override
    public String toString() {
        return   deviceId + "\t" + selfDuration + "\t" + thirdPartDuraton + "\t" + sumDuration ;
    }

    @Override
    public int compareTo(SpeakBeanComparable o) {
        if(this.getSumDuration()<o.getSumDuration()){
            return -1;
        }else if(this.getSumDuration()>o.getSumDuration()){
            return 1;
        }else{
            return 0;
        }
    }
}

(2)map (输出数据类型:SpeakBeanComparable, NullWritable)

public class SpeakSortMapper extends Mapper<LongWritable, Text, SpeakBeanComparable, NullWritable> {
    final SpeakBeanComparable speakBean = new SpeakBeanComparable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String word = value.toString();
        //按\t切割,读取设备id,自有时长和第三方时长
        String[] arraySpeakProp = word.split("\t");
        String deviceId = arraySpeakProp[1];//设备id
        String selfDuration = arraySpeakProp[arraySpeakProp.length - 3];//自有时长
        String thirdPartduration = arraySpeakProp[arraySpeakProp.length - 2];//第三方时长
        String sumDuration = arraySpeakProp[arraySpeakProp.length - 1];//总时长

        speakBean.setDeviceId(deviceId);
        speakBean.setSelfDuration(Long.parseLong(selfDuration));
        speakBean.setThirdPartDuraton(Long.parseLong(thirdPartduration));
        speakBean.setSumDuration(Long.parseLong(sumDuration));
        context.write(speakBean, NullWritable.get());
    }
}

(3)reduce (输出数据类型:SpeakBeanComparable, NullWritable)

public class SpeakSortReducer extends Reducer<SpeakBeanComparable,NullWritable, SpeakBeanComparable,NullWritable> {
    private SpeakBeanComparable result = new SpeakBeanComparable();

    @Override
    protected void reduce(SpeakBeanComparable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        for (NullWritable value : values) {
            context.write(key,value);
        }
    }
}

(4)driver

/**
 * 业务场景:源文件内容按某字段进行排序输出
 * 实现:自定义bean对象作为key 实现comparable进行指定字段排序
 */
public class SpeakSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "SpeakSortDriver");
        job.setJarByClass(SpeakSortDriver.class);
        //指定mappper
        job.setMapperClass(SpeakSortMapper.class);
        //指定reducer
        job.setReducerClass(SpeakSortReducer.class);
        //指定key
        job.setMapOutputKeyClass(SpeakBeanComparable.class);
        //指定value
        job.setMapOutputValueClass(NullWritable.class);
        //指定输出key
        job.setOutputKeyClass(SpeakBeanComparable.class);
        //指定输出value
        job.setOutputValueClass(NullWritable.class);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\speak\\out_bean"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\speak\\out_sort"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

备注:

  • 对于全局排序需要保证只有一个reduceTask!!
  • 默认reduceTask数量是1个;

2.4分组输出(继承WritableComparator,重写compare方法

,根据条件判断为同一对象进行输出)

2.4.1 需求

文件

订单id     商品id  价格
0000001 ???Pdt_01 222.8
0000002    Pdt_05 722.4
0000001    Pdt_02 33.8
0000003    Pdt_06 232.8
0000003    Pdt_02 33.8
0000002    Pdt_03 522.8
0000002    Pdt_04 122.4

输出结果

求出每一个订单中成交金额最大的一笔交易。

0000001    222.8
0000002    722.4
0000003    232.8

2.4.2 编程

(1)对象

/**
 * 自定义beam实现comparable,先按订单id排序,然后按价格倒序
 */
public class OrderBeanCompare implements WritableComparable<OrderBeanCompare> {
    //价格
    private Double price;
    //订单id
    private String orderId;

    public OrderBeanCompare() {
        super();
    }


    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(orderId);
        dataOutput.writeDouble(price);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.price = dataInput.readDouble();
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    @Override
    public String toString() {
        return orderId + "\t" + price;
    }

    @Override
    public int compareTo(OrderBeanCompare o) {
        int res = this.orderId.compareTo(o.orderId);
        if (res == 0) {
            res = -this.price.compareTo(o.price);
        }
        return res;
    }
}

(2)map (输出数据类型:OrderBeanCompare?, NullWritable)

public class OrderMapper extends Mapper<LongWritable, Text, OrderBeanCompare, NullWritable> {
    final OrderBeanCompare orderBean = new OrderBeanCompare();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String word = value.toString();
        String[] arry = word.split("\t");
        String orderId = arry[0];//设备id
        String price = arry[2];//价钱
        orderBean.setOrderId(orderId);
        orderBean.setPrice(Double.parseDouble(price));
        context.write(orderBean, NullWritable.get());
    }
}

(3)自定义分区

/**
 * 根据订单id进行分区
 */
public class OrderPartition extends Partitioner<OrderBeanCompare, NullWritable> {
    @Override
    public int getPartition(OrderBeanCompare orderBean, NullWritable nullWritable, int numReduceTasks) {
        return (orderBean.getOrderId().hashCode() & 2147483647) % numReduceTasks;
    }
}

(4)自定义分组

/**
 * 根据订单id一样,就判断为同一对象
 */
public class OrderGroupCompareTor extends WritableComparator {
    public OrderGroupCompareTor() {
        super(OrderBeanCompare.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBeanCompare order1 = (OrderBeanCompare) a;
        OrderBeanCompare order2 = (OrderBeanCompare) b;
        return order1.getOrderId().compareTo(order2.getOrderId());
    }
}

(5)reduce (输出数据类型:OrderBeanCompare?, NullWritable)

public class OrderReducer extends Reducer<OrderBeanCompare, NullWritable, OrderBeanCompare, NullWritable> {
    @Override
    protected void reduce(OrderBeanCompare key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

(6)driver

/**
 * 业务场景:挑选出订单里最高金额的记录
 * 实现:自定义bean对象作为key 实现comparable进行指定字段排序
 * 先按订单id排序,然后按价格倒序
 */
public class OrderDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "OrderDriver");
        job.setJarByClass(OrderDriver.class);
        //指定mappper
        job.setMapperClass(OrderMapper.class);
        //指定reducer
        job.setReducerClass(OrderReducer.class);
        //指定key
        job.setMapOutputKeyClass(OrderBeanCompare.class);
        //指定value
        job.setMapOutputValueClass(NullWritable.class);
        //指定输出key
        job.setOutputKeyClass(OrderBeanCompare.class);
        //指定输出value
        job.setOutputValueClass(NullWritable.class);

        job.setPartitionerClass(OrderPartition.class);
        job.setGroupingComparatorClass(OrderGroupCompareTor.class);
        //job.setNumReduceTasks(3);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\groupcomparetor\\groupingComparator.txt"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\groupcomparetor\\order_out"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

2.5Join实战(小表join大表)

2.5.1 需求

文件

177725422   产品经理
177725433  大数据开发工程师
1001    177725422  2020-01-03
1002   177725422  2020-01-04
1002   177725433  2020-01-03

输出结果

1001    177725422  2020-01-03 产品经理
1002   177725422  2020-01-04 产品经理
1002   177725433  2020-01-03 大数据开发工程师

2.5.2 编程

(1)map (输出数据类型:Text, NullWritable)

/**
 * 在mapTast阶段小表进行缓存
 */
public class MapperJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    final HashMap<String, String> map= new HashMap<>();;
    Text k =new Text();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputStreamReader inputStreamReader = new InputStreamReader(new FileInputStream(new File("D:/code/map_join/cache/position.txt")),"UTF-8");
        BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
        String line ;
        while (StringUtils.isNotEmpty(line=bufferedReader.readLine())){
            String[] split = line.split("\t");
            map.put(split[0],split[1]);
        }
        bufferedReader.lines();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        String positionId = split[1];
        k.set(value.toString()+"\t"+map.get(positionId));
        context.write(k,NullWritable.get());
    }
}

(2)driver

/**
 * 大表连接小表
 */
public class MapperJoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "MapperJoinDriver");
        job.setJarByClass(MapperJoinDriver.class);
        //指定mappper
        job.setMapperClass(MapperJoinMapper.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:/code/map_join/input/deliver_info.txt"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:/code/map_join/out"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

2.6CombineTextInputFormat数据输入(合并小文件wordcount)

MR框架默认的TextInputFormat切片机制按文件划分切片,

文件无论多小,都是单独一个切片, 然后由一个MapTask处理,如果有大量小文件,就对应的会生成并启动大量的 MapTask,而每个 MapTask处理的数据量很小大量时间浪费在初始化资源启动收回等阶段,这种方式导致资源利用 率不高。

CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上划分成一个切 片,这样多个小文件就可以交给一个MapTask处理,提高资源利用率。

具体使用方式

// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class);

//虚拟存储切片最大值设置4m

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304)

使用示例

假设设置setMaxInputSplitSize值为4M

四个小文件:1.txt -->2M ;2.txt-->7M;3.txt-->0.3M;4.txt--->8.2M

根据是否大于4,如果大于,对半切

2M,3.5M,3.5M,0.3M,4M,2.1M,2.1M 共7个虚拟存储块。

然后依次加以来,使大于4M就形成一个文件

最终会形成3个切片,大小分别为: (2+3.5)M,(3.5+0.3+4)M,(2.1+2.1)M

2.6.1 需求

文件

合并wordcount小文件

输出结果

2.6.2 编程

(1)map (输出数据类型:Text, IntWritable)

public class TextInputformatMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final Text text = new Text();
    private final IntWritable one = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String word = value.toString();
        String[] values = word.split("\t");
        text.set(values[0]);
        context.write(text,one);
    }
}

(2)reduce (输出数据类型:Text, IntWritable)

public class TextInputformatReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private final IntWritable reslut = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            int i = value.get();
            sum += i;
        }
        reslut.set(sum);
        context.write(key,reslut);
    }
}

(3)driver

/**
 * 合并小文件 CombineTextInputFormat
 */
public class TextInputformatDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "TextInputformatDriver");
        job.setJarByClass(TextInputformatDriver.class);
        //指定mappper
        job.setMapperClass(TextInputformatMapper.class);
        //指定reducer
        job.setReducerClass(TextInputformatReducer.class);
        //指定key
        job.setMapOutputKeyClass(Text.class);
        //指定value
        job.setMapOutputValueClass(IntWritable.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(IntWritable.class);
        // 如果不设置InputFormat,它默认用的是TextInputFormat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        //虚拟存储切片最大值设置4m
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\text\\input"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\text\\output"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

2.7自定义InputFormat(合并小文件)

HDFS还是MapReduce,在处理小文件时效率都非常低,但又难免面临处理大量小文件的场景, 此时,就需要有相应解决方案。

可以自定义InputFormat实现小文件的合并。?

2.7.1 需求

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的 key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为value

文件

输出结果

得到一个合并了多个小文件的SequenceFile文

思路

1. 定义一个类继承FileInputFormat 2. 重写isSplitable()指定为不可切分;重写createRecordReader()方法,创建自己的 RecorderReader对象 3. 改变默认读取数据方式,实现一次读取一个完整文件作为kv输出; 4. Driver指定使用的InputFormat类型

2.7.2 编程

(1)自定义fileInputFormat(输出数据类型:Text, BytesWritable)

/**
 * 自定义输入文件,合并小文件成二进制文件
 */
public class CustomFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        CustomRecordReader customRecordReader = new CustomRecordReader();
        customRecordReader.initialize(inputSplit,context);
        return customRecordReader;
    }
}

(2)自定义Reader(输出数据类型:Text, BytesWritable)

public class CustomRecordReader extends RecordReader<Text, BytesWritable> {
    private Boolean isProcess = true;
    private FileSplit fileSplit;
    private Configuration configuration;
    private Text key = new Text();
    private BytesWritable fileValue = new BytesWritable();
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) inputSplit;
        this.configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (isProcess) {
            //文件路径
            Path path = fileSplit.getPath();
            //文件大小
            long length = fileSplit.getLength();
            //读取文件
            FileSystem fs = path.getFileSystem(configuration);
            FSDataInputStream input = fs.open(path);
            byte[] contents = new byte[(int) length];
            //文件存入contents
            IOUtils.readFully(input, contents, 0, contents.length);
            //写出value  contents
            fileValue.set(contents,0,contents.length);
            //设置key
            key.set(path.toString());
            IOUtils.closeStream(input);
            isProcess = false;//一个文件就读一次,所以这里设置为false
            return true;
        }
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return fileValue;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {

    }
}

(3)map (输出数据类型:Text, BytesWritable)

public class CustomInputFormatMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException {
       context.write(key,value);
    }
}

(4)reduce (输出数据类型:Text, BytesWritable)

public class CustomInputFormatReducer extends Reducer<Text, BytesWritable,Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,values.iterator().next());
    }
}

(5)driver

/**
 * 合并小文件成二进制文件(缺点:有多少小文件就有多少mapTask任务)
 */
public class CustomInputFormatDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "CustomInputFormatDriver");
        job.setJarByClass(CustomInputFormatDriver.class);
        //指定mappper
        job.setMapperClass(CustomInputFormatMapper.class);
        //指定reducer
        job.setReducerClass(CustomInputFormatReducer.class);
        //指定key
        job.setMapOutputKeyClass(Text.class);
        //指定value
        job.setMapOutputValueClass(BytesWritable.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(BytesWritable.class);

        job.setInputFormatClass(CustomFileInputFormat.class);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\byte\\input"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\byte\\custom_out"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

打断点结论:每个文件走一次?

CustomFileInputFormat ——>CustomRecordReader?——>map——>reduce

2.8自定义outputFormat

2.8.1 需求

文件

输出结果

2.8.2 编程

(1)map (输出数据类型:Text, NullWritable)


public class CustomOutputFormatMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

(2)自定义CustomFileOutputFormat?(输出数据类型:Text, NullWritable)

/**
 * 自定义输出文件
 */
public class CustomFileOutputFormat extends FileOutputFormat<Text, NullWritable> {


    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataOutputStream out= fs.create(new Path("D:\\code\\outputformat\\out\\baidu.txt"));
        FSDataOutputStream other = fs.create(new Path("D:\\code\\outputformat\\out\\other.txt"));
        CustomRecordWriter customRecordWriter = new CustomRecordWriter(out,other);
        return customRecordWriter;
    }
}

(3)reduce (输出数据类型:Text, NullWritable)

public class CustomOutputFormatReducer extends Reducer<Text, NullWritable,Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

(4)CustomRecordWriter?(输出数据类型:Text, NullWritable)


public class CustomRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream out;
    private FSDataOutputStream otherOut;

    public CustomRecordWriter(FSDataOutputStream out, FSDataOutputStream other) {
        this.out= out;
        this.otherOut = other;
    }

    @Override
    public void write(Text text, NullWritable nullWritable) throws IOException, InterruptedException {
        String key = text.toString();
        if (key.contains("baidu")) {
            out.write(key.getBytes());
            out.write("\r\n".getBytes());
        } else {
            otherOut.write(key.getBytes());
            otherOut.write("\r\n".getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        IOUtils.closeStream(out);
        IOUtils.closeStream(otherOut);
    }
}

(3)driver

/**
 * 输出文件进行内容划分
 */
public class CustomOutputFormatDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "CustomInputFormatDriver");
        job.setJarByClass(CustomOutputFormatDriver.class);
        //指定mappper
        job.setMapperClass(CustomOutputFormatMapper.class);
        //指定reducer
        job.setReducerClass(CustomOutputFormatReducer.class);
        //指定key
        job.setMapOutputKeyClass(Text.class);
        //指定value
        job.setMapOutputValueClass(NullWritable.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(NullWritable.class);
        job.setOutputFormatClass(CustomFileOutputFormat.class);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\outputformat\\input"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\outputformat\\custom_out"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

打断点结论:

一个文件一个mapTask,map每行调一次,但FileOut整个过程只调了一次,然后reduce每行(去重后的每行)调一次,紧接着调writer(去重后的每行调一次)

2.9 压缩文件

2.9.1?需求

把文件进行压缩

文件

输出结果

2.9.2 编程

(1)driver

public class SequenceFileOutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "SequenceFileOutputDriver");
        //尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间
        //针对Sequencefile的压缩
        SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
        //压缩类型:record压缩
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\text\\input"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\text\\output"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

(2)读取压缩文件

map

public class TextInputformatMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private final Text text = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        text.set(value.toString());
        context.write(text,NullWritable.get());
    }
}

reduce

public class TextInputformatReducer extends Reducer<Text, IntWritable,Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

driver

/**
 * 查看压缩文件输出
 */
public class TextInputformatDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "TextInputformatDriver");
        job.setJarByClass(TextInputformatDriver.class);
        //指定mappper
        job.setMapperClass(TextInputformatMapper.class);
        //指定reducer
        job.setReducerClass(TextInputformatReducer.class);
        //指定key
        job.setMapOutputKeyClass(Text.class);
        //指定value
        job.setMapOutputValueClass(NullWritable.class);
        //指定输出key
        job.setOutputKeyClass(Text.class);
        //指定输出value
        job.setOutputValueClass(NullWritable.class);

        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\inputformat\\text\\output"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\inputformat\\text\\output2"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

2.10 合并小文件,自定义输入,自定义输出文件名称

2.10.1 需求

现在有一些订单的评论数据,需求,将订单按照好评与差评区分开来,将数据输出到不同的文件目录 下,数据内容如下,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评。 现需要根据好评,中评,差评把数据分类并输出到不同的目录中,并且要求按照时间顺序降序排列。

文件

1	我想再来一个	\N	1	3	hello	来就来吧	0	2018-03-14 22:29:03
2	好的	\N	1	1	添加一个吧	说走咱就走啊	0	2018-03-14 22:42:04
3	haobuhao	\N	1	1	nihao		0	2018-03-24 22:55:17
4	店家很好 非常好	\N	1	3	666	谢谢	0	2018-03-23 11:15:20
5	a'da'd	\N	0	4	da'd's	打打操作	0	2018-03-22 14:52:42
6	达到	\N	1	4	1313	13132131	0	2018-03-07 14:30:38
7	321313	\N	1	4	111	1231231	1	2018-03-06 14:54:24
8	1	\N	1	4	1	110000	0	2018-03-23 14:54:42
9	666666666666666	\N	1	4	4444	7777777777777777	0	2018-03-01 10:58:24
10	00000000	\N	1	4	55555555555	009999999	1	2018-03-02 10:58:47
11	311111111111	\N	0	4	999999	1333333333	2	2018-03-03 10:59:10

输出结果

思路分析

现在有大量类似上面的小文件!

step1 :压缩文件

自定义分区根据评论等级把数据分区

自定义OutputFormat把数据输出到多个目录

2.10.2 编程

(1)压缩文件

/**
 * 压缩文件
 */
public class SequenceFileOutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "SequenceFileOutputDriver");
        //尽可能降低数据的量,减少磁盘空间的占用,网络间通信时数据量小可以节省时间
        //针对Sequencefile的压缩
        SequenceFileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
        //压缩类型:record压缩
        SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.RECORD);
        //输入路径
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\out_comment\\combine_text_input"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\out_comment\\seq_input"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

(2)自定义对象

public class MyCommentBean implements WritableComparable<MyCommentBean> {
    private String orderId;
    private String comment;
    private String commentExt;
    private int goodsNum;
    private String phoneNum;
    private String userName;
    private String address;
    private int commentStatus;
    private String commentTime;

    @Override
    public String toString() {
        return orderId+"\t"+comment+"\t"+commentExt+"\t"+goodsNum+"\t"+phoneNum+"\t"+userName+"\t"+address+"\t"+commentStatus+"\t"+commentTime;
    }
//无参构造

    public MyCommentBean() {
    }

    public MyCommentBean(String orderId, String comment, String commentExt, int goodsNum, String phoneNum, String userName, String address, int commentStatus, String commentTime) {
        this.orderId = orderId;
        this.comment = comment;
        this.commentExt = commentExt;
        this.goodsNum = goodsNum;
        this.phoneNum = phoneNum;
        this.userName = userName;
        this.address = address;
        this.commentStatus = commentStatus;
        this.commentTime = commentTime;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getComment() {
        return comment;
    }

    public void setComment(String comment) {
        this.comment = comment;
    }

    public String getCommentExt() {
        return commentExt;
    }

    public void setCommentExt(String commentExt) {
        this.commentExt = commentExt;
    }

    public int getGoodsNum() {
        return goodsNum;
    }

    public void setGoodsNum(int goodsNum) {
        this.goodsNum = goodsNum;
    }

    public String getPhoneNum() {
        return phoneNum;
    }

    public void setPhoneNum(String phoneNum) {
        this.phoneNum = phoneNum;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public int getCommentStatus() {
        return commentStatus;
    }

    public void setCommentStatus(int commentStatus) {
        this.commentStatus = commentStatus;
    }

    public String getCommentTime() {
        return commentTime;
    }

    public void setCommentTime(String commentTime) {
        this.commentTime = commentTime;
    }

    //定义排序规则,按照时间降序;0,1,-1
    @Override
    public int compareTo(MyCommentBean o) {
        return o.getCommentTime().compareTo(this.commentTime);
    }

    //序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeUTF(comment);
        out.writeUTF(commentExt);
        out.writeInt(goodsNum);
        out.writeUTF(phoneNum);
        out.writeUTF(userName);
        out.writeUTF(address);
        out.writeInt(commentStatus);
        out.writeUTF(commentTime);
    }

    //反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.comment = in.readUTF();
        this.commentExt = in.readUTF();
        this.goodsNum = in.readInt();
        this.phoneNum = in.readUTF();
        this.userName = in.readUTF();
        this.address = in.readUTF();
        this.commentStatus = in.readInt();
        this.commentTime = in.readUTF();
    }
}

(3)map (输出数据类型:MyCommentBean, NullWritable)

public class SeqMapper extends Mapper<LongWritable, Text, MyCommentBean, NullWritable> {
    MyCommentBean commentBean = new MyCommentBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String word = value.toString();
        String[] lineValue = word.split("\t");
        if (lineValue.length >= 10) {
            commentBean.setOrderId(lineValue[1]);
            commentBean.setComment(lineValue[2]);
            commentBean.setCommentExt(lineValue[3]);
            commentBean.setGoodsNum(Integer.parseInt(lineValue[4]));
            commentBean.setPhoneNum(lineValue[5]);
            commentBean.setUserName(lineValue[6]);
            commentBean.setAddress(lineValue[7]);
            commentBean.setCommentStatus(Integer.parseInt(lineValue[8]));
            commentBean.setCommentTime(lineValue[9]);
        }
        context.write(commentBean,NullWritable.get());
    }
}

(4)自定义分区

/**
 * 自定义分区
 */
public class MyPartition extends Partitioner<MyCommentBean, NullWritable> {

    @Override
    public int getPartition(MyCommentBean commentBean, NullWritable nullWritable, int i) {
        return commentBean.getCommentStatus();
    }
}

(5)自定义输出文件

/**
 * 自定义输出文件
 */
public class MyFileOutputFormat extends FileOutputFormat<MyCommentBean, NullWritable> {
    FSDataOutputStream good;
    FSDataOutputStream common;
    FSDataOutputStream bad;

    @Override
    public RecordWriter<MyCommentBean, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        FileSystem fs = FileSystem.get(context.getConfiguration());
        int id = context.getTaskAttemptID().getTaskID().getId();
        if (id == 0) {
            good = fs.create(new Path("D:\\code\\out_comment\\seq_out\\good\\good.txt"));
        } else if (id == 1) {
            common = fs.create(new Path("D:\\code\\out_comment\\seq_out\\common\\common.txt"));
        } else {
            bad = fs.create(new Path("D:\\code\\out_comment\\seq_out\\bad\\bad.txt"));
        }
        MyRecordWriter myRecordWriter = new MyRecordWriter(good, common, bad);
        return myRecordWriter;
    }
}

(6)reduce (输出数据类型:MyCommentBean, NullWritable)

public class SeqReducer extends Reducer<MyCommentBean, NullWritable,MyCommentBean,NullWritable> {
    @Override
    protected void reduce(MyCommentBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}

?(7)自定义写出类

public class MyRecordWriter extends RecordWriter<MyCommentBean, NullWritable> {
    private FSDataOutputStream good;
    private FSDataOutputStream common;
    private FSDataOutputStream bad;


    public MyRecordWriter(FSDataOutputStream good, FSDataOutputStream common, FSDataOutputStream bad) {
        this.good = good;
        this.common = common;
        this.bad = bad;
    }

    @Override
    public void write(MyCommentBean commentBean, NullWritable nullWritable) throws IOException, InterruptedException {
        int commentStatus = commentBean.getCommentStatus();
        if (commentStatus==0) {
            good.write(commentBean.toString().getBytes());
            good.write("\r\n".getBytes());
        } else if(commentStatus==1) {
            common.write(commentBean.toString().getBytes());
            common.write("\r\n".getBytes());
        }else{
            bad.write(commentBean.toString().getBytes());
            bad.write("\r\n".getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        IOUtils.closeStream(good);
        IOUtils.closeStream(common);
        IOUtils.closeStream(bad);
    }
}

(8)driver

/**
 * 对压缩文件进行统计输出
 */
public class SeqDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取job实例
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "SeqDriver");
        job.setJarByClass(SeqDriver.class);
        //指定mappper
        job.setMapperClass(SeqMapper.class);
        //指定reducer
        job.setReducerClass(SeqReducer.class);

        job.setMapOutputKeyClass(MyCommentBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(MyCommentBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setPartitionerClass(MyPartition.class);
        //指定输出outputformat类型
        job.setOutputFormatClass(MyFileOutputFormat.class);
        job.setNumReduceTasks(3);
        //输入路径:seq_input,custom_input
        FileInputFormat.setInputPaths(job, new Path("D:\\code\\out_comment\\seq_input"));
        //输出路径
        FileOutputFormat.setOutputPath(job, new Path("D:\\code\\out_comment\\seq_out"));
        //提交
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-05 20:22:40  更:2021-07-05 20:23:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 19:09:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码