?AirBean.java
package org.jsoup;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class AirBean implements Writable{
String cityString;
float pm25;
float iaqi;
public AirBean() {
}
public AirBean(String cityString, float pm25) {
this.cityString = cityString;
this.pm25 = pm25;
float bpHi = 0,bpLo = 0,iaqiHi = 0,iaqiLo = 0;
long[] pm25limits = {0,35,75,115,150,250,350,500};
long[] airLimits = {0,50,100,150,200,300,400,500};
for (int i = 0; i < 7; i++) {
if (pm25>=pm25limits[i]&pm25<pm25limits[i+1]) {
bpHi = pm25limits[i+1];
bpLo = pm25limits[i];
iaqiHi = airLimits[i+1];
iaqiLo = airLimits[i];
this.iaqi = ((iaqiHi-iaqiLo)/(bpHi-bpLo))*(pm25-bpLo)+iaqiLo;
}
}
}
public String getCityString() {
return cityString;
}
public void setCityString(String cityString) {
this.cityString = cityString;
}
public float getPm25() {
return pm25;
}
public void setPm25(long pm25) {
this.pm25 = pm25;
}
public float getIaqi() {
return iaqi;
}
public void setIaqi(long iaqi) {
this.iaqi = iaqi;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(cityString);
out.writeFloat(pm25);
out.writeFloat(iaqi);
}
@Override
public void readFields(DataInput in) throws IOException {
cityString = in.readUTF();
pm25 = in.readFloat();
iaqi = in.readFloat();
}
}
AirMapper.java?
package org.jsoup;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class AirMapper extends Mapper<LongWritable, Text, Text, AirBean>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if(key.toString().equals('0')){
}else{
String line = value.toString();
String[] fields = line.split(",");
String cityString = fields[16];
float pm25 = Long.parseLong(fields[3]);
context.write(new Text(cityString),new AirBean(cityString, pm25));
}
}
}
AirReducer.java
package org.jsoup;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class AirReducer extends Reducer<Text, AirBean, Text, Text>{
@Override
protected void reduce(Text key, Iterable<AirBean> values, Context context) throws IOException, InterruptedException {
float pm25Iaqi_av = 0f;
float pm25Iaqi_sum = 0f;
float count = 0f;
for (AirBean value : values) {
float iaqiLong = value.getIaqi();
pm25Iaqi_sum = pm25Iaqi_sum + iaqiLong;
count = count + 1;
}
pm25Iaqi_av = pm25Iaqi_sum/count;
context.write(key,new Text(String.valueOf(pm25Iaqi_av)));
}
}
AirRunner.java
package org.jsoup;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class AirRunner {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
BasicConfigurator.configure();
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(AirRunner.class);
job.setMapperClass(AirMapper.class);
job.setReducerClass(AirReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AirBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.100.100:9000/input"));
FileOutputFormat.setOutputPath(job,new Path("hdfs://192.168.100.100:9000/output") );
job.waitForCompletion(true);
}
}
GetFile.java
package org.jsoup;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
i am cloud
1. @author Cloud
*/
public class GetFile {
public static void main(String[] args) throws IOException{
Configuration conf = new Configuration();
//192.168.172.5这个是你的Linux中的master的ip地址,你自己换。
conf.set("fs.defaultFS","hdfs://192.168.100.100:9000");
FileSystem file = FileSystem.get(conf);
file.copyFromLocalFile(new Path("/home/PM25city.txt"), new Path("/input"));
//前面的Path是计算机的路径位置,后面是Hadoop路径位置
file.close();
}
}
OrderBean.java
package org.jsoup;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/**
*
* @author Cloud
*/
public class OrderBean {
public static void main(String[] args) throws IOException {
String path = "hdfs://192.168.100.100:9000/output/part-r-00000" ;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(path), conf);
FSDataInputStream hdfsInStream = fs.open(new Path(path));
InputStreamReader isr = new InputStreamReader(hdfsInStream, "utf-8");
BufferedReader br = new BufferedReader(isr);
String line;
//使用map保存,再排序
HashMap<String,Float> iaqiMap = new HashMap<>();
while ((line = br.readLine()) != null) {
String[] val = line.split("\\s+");
String city = val[0];
float iaqi = Float.parseFloat(val[1]);
iaqiMap.put(city,iaqi);
}
iaqiMap.entrySet()
.stream()
.sorted(Map.Entry.<String,Float>comparingByValue())
.forEach(System.out::println);
}
}
|