IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 超详细的MapReduce WordCount 统计微博评论最多的用户 -> 正文阅读

[大数据]超详细的MapReduce WordCount 统计微博评论最多的用户

超详细的MapReduce WordCount 统计微博评论最多的用户

使用fastjson解析每一行的json

List<Map<String,Object>> parses = (List<Map<String,Object>>) JSON.parse(value.toString());

提取userId

for (Map<String, Object> pars : parses) {
            String new_value = (String) pars.get("userId");
            context.write(new IntWritable(1),new Text(new_value));
        }

Mapper完整代码

package anu.mapereduce;

import com.alibaba.fastjson.JSON;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 *  yucheng_gu
 */
public class MainMapper extends Mapper<LongWritable, Text,IntWritable,Text >{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        List<Map<String,Object>> parses = (List<Map<String,Object>>) JSON.parse(value.toString());
        for (Map<String, Object> pars : parses) {
            String new_value = (String) pars.get("userId");
            context.write(new IntWritable(1),new Text(new_value));
        }
    }
}

reduce查找每个用户的出现数量

Map<String,Integer> navs = new HashMap<>();
        for (Text value : values) {
            Integer integer = navs.get(value.toString());
            if (integer == null){
                navs.put(value.toString(),1);
            }else {
                navs.put(value.toString(),integer+1);
            }
        }

把所有用户的评论数量的信息做排序

List<String> llas = new ArrayList<>();
        for (String keys_l : navs.keySet()) {
            Integer is_v = 0;
            String nname = "null";
            Map<String,Integer> new_navs=new HashMap<>();
            for (String keyaa : navs.keySet()) {
                if (! llas.contains(keyaa)){
                    new_navs.put(keyaa,navs.get(keyaa));
                }
            }
            for (String keys : new_navs.keySet()) {
                if(new_navs.get(keys)>is_v){
                    is_v = new_navs.get(keys);
                    nname = keys;
                }
            }
            llas.add(nname);
        }

输出数据

for (String lla : llas) {
            context.write(new Text(lla),new IntWritable(navs.get(lla)));
        }

Reduce完整代码

package anu.mapereduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

public class MainReduce extends Reducer< IntWritable,Text,Text, IntWritable> {

    @Override
    protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Map<String,Integer> navs = new HashMap<>();
        for (Text value : values) {
            Integer integer = navs.get(value.toString());
            if (integer == null){
                navs.put(value.toString(),1);
            }else {
                navs.put(value.toString(),integer+1);
            }
        }
        List<String> llas = new ArrayList<>();
        for (String keys_l : navs.keySet()) {
            Integer is_v = 0;
            String nname = "null";
            Map<String,Integer> new_navs=new HashMap<>();
            for (String keyaa : navs.keySet()) {
                if (! llas.contains(keyaa)){
                    new_navs.put(keyaa,navs.get(keyaa));
                }
            }
            for (String keys : new_navs.keySet()) {
                if(new_navs.get(keys)>is_v){
                    is_v = new_navs.get(keys);
                    nname = keys;
                }
            }
            llas.add(nname);
        }
        for (String lla : llas) {
            context.write(new Text(lla),new IntWritable(navs.get(lla)));
        }
    }
}

为了方便调试不依赖集群运行,使用本地运行,具体的方法可自己百度

WordCountRunner 启动类的完整代码

package anu.mapereduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

/**
 *  yucheng_gu
 */
public class WordCountRunner{
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        //注册本地hadoop驱动
        System.setProperty("hadoop.home.dir","D:\\LocalServer\\hadoop-2.9.2");
        Configuration configuration = new Configuration();
        //创建一个job任务对象,super.getConf()获取父类的configuration,jobName:任务名称
        Job myWordCount = Job.getInstance(configuration, "MyWordCount");
        //配置job任务的八个步骤
        //第一步:指定读取文件的方式和源文件的路径
        myWordCount.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(myWordCount,new Path(args[0]));
        //第二步:指定map阶段的处理方式,和数据类型
        myWordCount.setMapperClass(MainMapper.class);
        //设置map阶段k2的类型
        myWordCount.setMapOutputKeyClass(IntWritable.class);
        //设置map阶段v2的类型
        myWordCount.setMapOutputValueClass(Text.class);
        //第三,四,五,六,步采用默认暂时不用配置
        //第七步:指定reduce阶段的处理方式和数据类型
        myWordCount.setReducerClass(MainReduce.class);
        //设置reduce阶段k3的类型
        myWordCount.setOutputKeyClass(Text.class);
        //设置reduce阶段v3的类型
        myWordCount.setOutputValueClass(IntWritable.class);
        //第八步:设置输出类型
        myWordCount.setOutputFormatClass(TextOutputFormat.class);
        //设置输出路径
        // 6 指定job的输入原始所在目录
        FileInputFormat.setInputPaths(myWordCount,
                new Path("D:\\javaproject\\20210722_GOUP_11_GYC\\MapperReuceDemo01\\src\\main\\resources\\datas.json"));
        FileOutputFormat.setOutputPath(myWordCount,
                new Path("D:\\javaproject\\20210722_GOUP_11_GYC\\MapperReuceDemo01\\src\\main\\resources\\input"));
        //等待任务结束
        boolean b = myWordCount.waitForCompletion(true);
        System.exit(b?0:1);
    }
}

输出结果:
在这里插入图片描述

新手第一次写博客勿喷!

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-23 10:51:49  更:2021-07-23 10:51:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 13:50:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码