IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> bd210830-MapReduce -> 正文阅读

[大数据]bd210830-MapReduce

  1. 统计单词出现的次数,将a-n开头的单词放??个?件,o-z开头的单词放??个?件, 其他的放??个?件。
    1.1 自定义mapper类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;


public class myMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    Text k2 = new Text();
    IntWritable v2 = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String lineRecord = value.toString();
        System.out.println(lineRecord);
        String[] words = lineRecord.split(" ");
        for (String word : words) {
            k2.set(word);
            v2.set(1);
            try {
                context.write(k2,v2);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
1.2 自定义reducer类
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class wordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    @Override
    public void reduce(Text k2, Iterable<IntWritable> v2, Context context){
        System.out.println("123");
        int sum = 0;
        for (IntWritable intWritable : v2) {
            sum+=intWritable.get();
        }
        //sum就是单词个数
        System.out.println("sum"+sum);
        IntWritable v3 = new IntWritable(sum);
        try {
            context.write(k2,v3);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
1.3 自定义分片规则
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class WordcountPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        if(text.toString().matches("^[a-iA-I].*"))
            return 0;
        else return 1;
    }
}
1.4 启动类,设置参数
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class MyDriver {
    public static void main(String[] args) {
        try {
            Configuration entries = new Configuration();

            Job job = Job.getInstance(entries);
            job.setJarByClass(MyDriver.class);

            job.setMapperClass(myMapper.class);
            job.setReducerClass(wordCountReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            job.setNumReduceTasks(2);

            job.setPartitionerClass(WordcountPartitioner.class);
            //设置输入和输出路径
            FileInputFormat.addInputPath(job,new Path("E:/y/*"));
            Path path = new Path("E:/y/output");
            FileSystem fileSystem = FileSystem.get(entries);
            if(fileSystem.exists(new Path("E:/y/output"))){
                fileSystem.delete(new Path("E:/y/output"),true);
            }
            FileOutputFormat.setOutputPath(job,path);

            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. 绘制入门案例wordcount的图解

在这里插入图片描述

  1. 数据如下
7369,SMITH,CLERK,7902,1980-12-17,800,null,20
7499,ALLEN,SALESMAN,7698,1981-02-20,1600,300,30
7521,WARD,SALESMAN,7698,1981-02-22,1250,500,30
7566,JONES,MANAGER,7839,1981-04-02,2975,null,20
7654,MARTIN,SALESMAN,7698,1981-09-28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981-05-01,2850,null,30
7782,CLARK,MANAGER,7839,1981-06-09,2450,null,10
7788,SCOTT,ANALYST,7566,1987-04-19,3000,null,20
7839,KING,PRESIDENT,null,1981-11-17,5000,null,10
7844,TURNER,SALESMAN,7698,1981-09-08,1500,0,30
7876,ADAMS,CLERK,7788,1987-05-23,1100,null,20
7900,JAMES,CLERK,7698,1981-12-03,950,null,30
7902,FORD,ANALYST,7566,1981-12-02,3000,null,20
7934,MILLER,CLERK,7782,1982-01-23,1300,null,10

使用mr程序统计每年入职的人数。

最终结果要求如下:

1. 格式如下:  
	年份:1980  人数:xxx
	年份:1981  人数:xxx
	.......
2. 两个分区:
	 0分区存储  入职年份<1982年的
	 1分区存储  入职年份>=1982年的

PersonCountMapper

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PersonCountMapper extends Mapper<LongWritable,Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
        String s = v1.toString();
        String year = s.split(",")[4].split("-")[0];
        System.out.println(year);
        context.write(new Text(year),new IntWritable(1));
    }
}

PersonCountReducer

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PersonCountReducer extends Reducer<Text, IntWritable,Text,Text> {
    @Override
    protected void reduce(Text k2, Iterable<IntWritable> v2, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable intWritable : v2) {
            sum+=intWritable.get();
        }
        context.write(new Text("年份:"+k2),new Text("人数:"+sum));
    }
}

PersonCountPartitioner

public class PersonCountPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable i1, int i) {
        int n = Integer.parseInt(text.toString());
        return (n<1982)?0:1;
    }
}

PersonCountDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PersonCountDriver {
    public static void main(String[] args) {
        try {
            Configuration entries = new Configuration();
            Job job = Job.getInstance(entries);
            job.setJarByClass(PersonCountDriver.class);

            job.setMapperClass(PersonCountMapper.class);
            job.setReducerClass(PersonCountReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            job.setNumReduceTasks(2);

            job.setPartitionerClass(PersonCountPartitioner.class);
            //设置输入和输出路径
            FileInputFormat.addInputPath(job,new Path("E:/y/person.txt"));
            Path path = new Path("E:/y/output2");
            FileSystem fileSystem = FileSystem.get(entries);
            if(fileSystem.exists(new Path("E:/y/output2"))){
                fileSystem.delete(new Path("E:/y/output2"),true);
            }
            FileOutputFormat.setOutputPath(job,path);

            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-31 15:30:56  更:2021-08-31 15:31:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:52:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码