IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 第四周mapreduce实战之空气质量 -> 正文阅读

[大数据]第四周mapreduce实战之空气质量

?AirBean.java

package org.jsoup;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

public class AirBean implements Writable{
    String cityString;
    float pm25;
    float iaqi;

    public AirBean() {
    }

    public AirBean(String cityString, float pm25) {
        this.cityString = cityString;
        this.pm25 = pm25;
        float bpHi = 0,bpLo = 0,iaqiHi = 0,iaqiLo = 0;

        long[] pm25limits = {0,35,75,115,150,250,350,500};
        long[] airLimits = {0,50,100,150,200,300,400,500};

        for (int i = 0; i < 7; i++) {
            if (pm25>=pm25limits[i]&pm25<pm25limits[i+1]) {
                bpHi = pm25limits[i+1];
                bpLo = pm25limits[i];
                iaqiHi = airLimits[i+1];
                iaqiLo = airLimits[i];
                this.iaqi = ((iaqiHi-iaqiLo)/(bpHi-bpLo))*(pm25-bpLo)+iaqiLo;
            }

        }

    }

    public String getCityString() {
        return cityString;
    }

    public void setCityString(String cityString) {
        this.cityString = cityString;
    }

    public float getPm25() {
        return pm25;
    }

    public void setPm25(long pm25) {
        this.pm25 = pm25;
    }

    public float getIaqi() {
        return iaqi;
    }

    public void setIaqi(long iaqi) {
        this.iaqi = iaqi;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(cityString);
        out.writeFloat(pm25);
        out.writeFloat(iaqi);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        cityString = in.readUTF();
        pm25 = in.readFloat();
        iaqi = in.readFloat();
    }
}

AirMapper.java?

package org.jsoup;

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


public class AirMapper extends Mapper<LongWritable, Text, Text, AirBean>{

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        if(key.toString().equals('0')){

        }else{
            String line = value.toString();
            String[] fields = line.split(",");
            String cityString = fields[16];
            float pm25 = Long.parseLong(fields[3]);
            context.write(new Text(cityString),new AirBean(cityString, pm25));
        }

    }
}

AirReducer.java

package org.jsoup;


import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class AirReducer extends Reducer<Text, AirBean, Text, Text>{

    @Override
    protected void reduce(Text key, Iterable<AirBean> values, Context context) throws IOException, InterruptedException {
        float pm25Iaqi_av = 0f;
        float pm25Iaqi_sum = 0f;
        float count = 0f;
        for (AirBean value : values) {
            float iaqiLong = value.getIaqi();
            pm25Iaqi_sum = pm25Iaqi_sum + iaqiLong;
            count = count + 1;
        }
        pm25Iaqi_av = pm25Iaqi_sum/count;
        context.write(key,new Text(String.valueOf(pm25Iaqi_av)));
    }
}

AirRunner.java

package org.jsoup;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;


public class AirRunner {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(AirRunner.class);

        job.setMapperClass(AirMapper.class);
        job.setReducerClass(AirReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(AirBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);


        FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.100.100:9000/input"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.100.100:9000/output") );

        job.waitForCompletion(true);
    }
}

GetFile.java

package org.jsoup;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 i am cloud
 1. @author Cloud
 */
public class GetFile {
    public static void main(String[] args) throws IOException{
        Configuration conf = new Configuration();
        //192.168.172.5这个是你的Linux中的master的ip地址,你自己换。
        conf.set("fs.defaultFS","hdfs://192.168.100.100:9000");
        FileSystem file = FileSystem.get(conf);
        file.copyFromLocalFile(new Path("/home/PM25city.txt"), new Path("/input"));
        //前面的Path是计算机的路径位置,后面是Hadoop路径位置
        file.close();
    }
}

OrderBean.java

package org.jsoup;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 *
 * @author Cloud
 */
public class OrderBean {
    public static void main(String[] args) throws IOException {
        String path = "hdfs://192.168.100.100:9000/output/part-r-00000" ;
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(path), conf);
        FSDataInputStream hdfsInStream = fs.open(new Path(path));
        InputStreamReader isr = new InputStreamReader(hdfsInStream, "utf-8");
        BufferedReader br = new BufferedReader(isr);
        String line;
        //使用map保存,再排序
        HashMap<String,Float> iaqiMap = new HashMap<>();
        while ((line = br.readLine()) != null) {
            String[] val = line.split("\\s+");
            String city = val[0];
            float iaqi = Float.parseFloat(val[1]);
            iaqiMap.put(city,iaqi);
        }
        iaqiMap.entrySet()
                .stream()
                .sorted(Map.Entry.<String,Float>comparingByValue())
                .forEach(System.out::println);
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:09:43 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:49:42-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码