学习背景:
基于美国民航航班的历年数据(1987年--2008年),开发MapReduce、Pig、hive 应用程序计算其中某一年各个航班的飞行数据(飞行架次、飞行距离);
MapReduce项目:
1.编写MapReduce项目;
2.将数据文件上传到hadoop;
?3.可以看看有没有上传成功,也可以在eclipse中查看;
?4.启动MapReduce项目,对项目进行配置;
5.我们可以导出flightweekDist.jar 文件,并将其运行在hadoop上;
?6.可以到自己的输出路径去查看结果了。
是不是很简单[手动狗头]
Pig项目:
1.当然是编写pig脚本了
里面用到的和sql语句都很像,学过数据库的应该问题不大。
解释一下,load 数据后,stream的目的;
有兴趣的小伙伴可以去来了解一下 stream 的用途,有过滤、去重、排序、分组,反正很多,很复杂。那句话的用途就是筛掉原数据中的表头部分。
后面的语句不解释了,不知道的看书。
2.然后我们就可以愉快的执行脚本了;?
?看到 success 就可以了。
3.直接 hadoop fs -cat 查看结果。
hive项目:
hive就比较有难度了,因为最后还要把结果写入到 mysql 中。
1.上来直接写代码;
很简单,都是jdbc的东西,javaSE大家都搞过。
注意:记得启动hiveserver
?2.然后就可以跑了,sell中会打印日志信息,会跑一会,可以看看日志,这样就知道程序再跑而不是卡了,哈哈。
小提示:
如果大家在启动了hiveserver后,还需要用到终端的话,可以ctrl + z 后 输入 bg 就可以挂起了。(大佬教的,哈哈哈);
最麻烦最麻烦的地方终于来了。
使用udf 函数 将 hive 运行结果写入到 mysql 中;(我要是不把函数放出来我会不会被骂)
1.添加 jar 包(hive-contrib-0.9.0-cdh4.1.2.jar 和 mysql-connector-java-5.1.38.jar);
?
2.编写udf函数打包成jar包导出。(大概长这个样子)
3.打包导出后,就是创建函数了。
注:函数要每次进入都需要创建的,要是经常需要的话,可以把这个写入到配置文件中,具体的我就不会了。
hive>select dboutput('jdbc:mysql://192.168.1.100/hive','hive','mysql','INSERT INTO flightinfodistance(flightnumber,distance) VALUES (?,?)',flightnumber,distance) from flightinfodistance;
这句话比较重要,我们来分析一下哈(从左到右)
'dboutput' 是函数名;
'192.168.1.100' 是主机ip;
'hive' 是mysql数据库名:
'hive' 是mysql数据库账号;
mysql是mysql数据库密码;
'flightinfodistance'是mysql数据库名;
(flightnumber,distance)是mysql 数据库字段;
flightnumber,distance 是hive表中字段;
flightinfodistance是hive表名;
执行这句话就行了,可以去自己的mysql数据库中查看一下哈。
含泪上代码:
MapReduce;
package com.ssh.flight;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.mockito.asm.tree.IntInsnNode;
public class FlightWeekDist {
//
public static class FlightNumMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text dateofWeek = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
try {
int year = Integer.parseInt(fields[0]); // filter first raw
} catch (NumberFormatException e) {
return;
}
dateofWeek.set(fields[3]); // date of week
context.write(dateofWeek, one);
}
}
//
public static class FlightMilesMapper extends
Mapper<Object, Text, Text, IntWritable> {
private IntWritable Miles = new IntWritable();
private Text FlightNum = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
try {
int year = Integer.parseInt(fields[0]); // filter first raw
} catch (NumberFormatException e) {
return;
}
String flight = fields[8] + fields[9];
FlightNum.set(flight); // class name
int miles = 0;
try {
miles = Integer.parseInt(fields[18]); // filter first raw
} catch (NumberFormatException e) {
}
Miles.set(miles);
context.write(FlightNum, Miles);
}
}
//
public static class FlightSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
//
private static void removeOutputPath(Configuration conf, String output1,
String output2) throws IOException {
FileSystem hdfs = FileSystem.get(conf);
Path path = new Path(output1);
hdfs.delete(path, true);
path = new Path(output2);
hdfs.delete(path, true);
}
//
private static Job createFlightNumJob(Configuration conf, String input,
String output) throws IOException {
Job job = new Job(conf, "Flight Numbers");
job.setJarByClass(FlightWeekDist.class);
job.setMapperClass(FlightNumMapper.class);
job.setCombinerClass(FlightSumReducer.class);
job.setReducerClass(FlightSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
return job;
}
private static Job createFlightMilesJob(Configuration conf, String input,
String output) throws IOException {
Job job = new Job(conf, "Flight Milse");
job.setJarByClass(FlightWeekDist.class);
job.setMapperClass(FlightMilesMapper.class);
job.setCombinerClass(FlightSumReducer.class);
job.setReducerClass(FlightSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));
return job;
}
//此处以后代码,是将结果写入到html文件中的
public static void transHDFSfile2local(Configuration conf, String src,
String dst) {
FileSystem hdfs;
try {
hdfs = FileSystem.get(conf);
Path path_src = new Path(src + "/part-r-00000");
Path path_dst = new Path(dst);
hdfs.copyToLocalFile(path_src, path_dst);
hdfs.close();
System.out.println("File copy success");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("copy failed:" + e.getMessage());
}
}
public static ArrayList<String> readFile(String path) {
File file = new File(path);
BufferedReader reader = null;
ArrayList<String> lines = new ArrayList<String>();
try {
reader = new BufferedReader(new FileReader(file));
String tempString = null;
int line = 1;
while ((tempString = reader.readLine()) != null) {
// ines.add(tempString.substring(0,
// tempString.lastIndexOf(' ')));
// System.out.println(tempString.substring(0,
// tempString.lastIndexOf(' ')));
// System.out.println(tempString.substring(tempString.lastIndexOf(' ')+1));
lines.add(tempString.substring(0, tempString.lastIndexOf(' ')));
lines.add(tempString.substring(tempString.lastIndexOf(' ') + 1));
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
}
}
}
// System.out.println("lines:");
// for (int i = 0; i < lines.size(); i++) {
// System.out.println(lines.get(i));
// }
return lines;
}
public static void writeFile(String path, ArrayList<String> strs) {
try {
FileWriter writer = new FileWriter(path, true);
int m = 0;
for (int i = 0; i < strs.size(); i++) {
if (m == 0) {
writer.write("<tr>" + '\n');
}
writer.write("<td>" + strs.get(i) + "</td>" + '\n');
m++;
if (m == 2) {
writer.write("</tr>" + '\n');
m = 0;
}
}
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void writeFileFromOtherFile(String from, String dst) {
try {
File from_file = new File(from);
FileWriter writer = new FileWriter(dst, true);
BufferedReader reader = null;
reader = new BufferedReader(new FileReader(from_file));
String tempString = null;
while ((tempString = reader.readLine()) != null) {
writer.write(tempString+'\n');
}
reader.close();
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void transTxT2HTML(String txt_src, String html_src) {
writeFile(html_src, readFile(txt_src));
}
//
public static void main(String[] args) throws Exception {
// hadoop jar flightcountjar.jar
// org.hebeu.hadoop.flightdist.FlightWeekDist ./flightcount/1987-all.csv
// ./flightcount/output1 ./flightcount/output2
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: ScoreAnalysis <in> <out1> <out2>");
System.exit(2);
}
removeOutputPath(conf, otherArgs[1], otherArgs[2]);
Job job = createFlightNumJob(conf, otherArgs[0], otherArgs[1]);
job.waitForCompletion(true);
job = createFlightMilesJob(conf, otherArgs[0], otherArgs[2]);
job.waitForCompletion(true);
// copy files to local
transHDFSfile2local(conf, "/home/hoodoop/flightcount/output1",
"/home/hoodoop/flight_data/week_flight.dat");
transHDFSfile2local(conf, "/home/hoodoop/flightcount/output2",
"/home/hoodoop/flight_data/distance_flight.dat");
writeFileFromOtherFile("/home/hoodoop/html_format/before.dat", "/home/hoodoop/test.html");
transTxT2HTML("/home/hoodoop/flight_data/week_flight.dat",
"/home/hoodoop/test.html");
writeFileFromOtherFile("/home/hoodoop/html_format/middle.dat", "/home/hoodoop/test.html");
transTxT2HTML("/home/hoodoop/flight_data/distance_flight.dat",
"/home/hoodoop/test.html");
writeFileFromOtherFile("/home/hoodoop/html_format/last.dat", "/home/hoodoop/test.html");
}
}
?pig;
records = LOAD '1987-all.csv' USING PigStorage(',') AS
(Year:int,Month:int,DayofMonth:int,DayOfWeek:int,DepTime:int,CRSDepTime:int,ArrTime:int,CRSArrTime:int,UniqueCarrier:chararray,FlightNum:chararray,TailNum:int,ActualElapsedTime:int,CRSElapsedTime:int,AirTime:int,ArrDelay:int,DepDelay:int,Origin:chararray,Dest:chararray,Distance:int,TaxiIn:chararray,TaxiOut:chararray,Cancelled:chararray,CancellationCode:chararray,Diverted:chararray,CarrierDelay:chararray,WeatherDelay:chararray,NASDelay:chararray,SecurityDelay:chararray,LateAircraftDelay:chararray);
flight_without_first_row = STREAM records THROUGH `tail -n +2` AS (Year:int,Month:int,DayofMonth:int,DayOfWeek:int,DepTime:int,CRSDepTime:int,ArrTime:int,CRSArrTime:int,UniqueCarrier:chararray,FlightNum:chararray,TailNum:int,ActualElapsedTime:int,CRSElapsedTime:int,AirTime:int,ArrDelay:int,DepDelay:int,Origin:chararray,Dest:chararray,Distance:int,TaxiIn:chararray,TaxiOut:chararray,Cancelled:chararray,CancellationCode:chararray,Diverted:chararray,CarrierDelay:chararray,WeatherDelay:chararray,NASDelay:chararray,SecurityDelay:chararray,LateAircraftDelay:chararray);
rmf week_flight_sort;
week_sorts = Group flight_without_first_row BY DayOfWeek;
week_sort = FOREACH week_sorts GENERATE group,COUNT($1);
STORE week_sort INTO 'week_flight_sort';
rmf flight_distances_statistices;
flight_distances= Group flight_without_first_row BY CONCAT(UniqueCarrier,FlightNum);
flight_distance = FOREACH flight_distances GENERATE group,SUM($1.Distance);
STORE flight_distance INTO 'flight_distances_statistices';
?hive;
package hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
public class HiveJdbcTest {
private static final String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver";
private static final String HOST = "192.168.1.100:10021";
private static final String URL = "jdbc:hive://" + HOST + "/default";
public static void main(String[] args) throws Exception {
Class.forName(driverName);
Connection conn = DriverManager.getConnection(URL, "", "");
Statement stmt = conn.createStatement();
String hql = "";
ResultSet res = null;
hql = "insert overwrite table flightinfosort select DayOfWeek,count(*) from FlightInfo1987 group by DayOfWeek ";
stmt.execute(hql);
hql = "insert overwrite table flightinfodistance select concat(UniqueCarrier,FlightNum),sum(Distance) from FlightInfo1987 group by concat(UniqueCarrier,FlightNum)";
stmt.execute(hql);
res.close();
stmt.close();
conn.close();
}
}
udf 函数GenericUDFDBOutput;?
package org.apache.Hadoop.hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.UDFType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
/**
* GenericUDFDBOutput is designed to output data directly from Hive to a JDBC
* datastore. This UDF is useful for exporting small to medium summaries that
* have a unique key.
*
* Due to the nature of hadoop, individual mappers, reducers or entire jobs can
* fail. If a failure occurs a mapper or reducer may be retried. This UDF has no
* way of detecting failures or rolling back a transaction. Consequently, you
* should only only use this to export to a table with a unique key. The unique
* key should safeguard against duplicate data.
*
* Use hive's ADD JAR feature to add your JDBC Driver to the distributed cache,
* otherwise GenericUDFDBoutput will fail.
*/
@Description(name = "dboutput",
value = "_FUNC_(jdbcstring,username,password,preparedstatement,[arguments])"
+ " - sends data to a jdbc driver",
extended = "argument 0 is the JDBC connection string\n"
+ "argument 1 is the user name\n"
+ "argument 2 is the password\n"
+ "argument 3 is an SQL query to be used in the PreparedStatement\n"
+ "argument (4-n) The remaining arguments must be primitive and are "
+ "passed to the PreparedStatement object\n")
@UDFType(deterministic = false)
public class GenericUDFDBOutput extends GenericUDF {
private static final Log LOG = LogFactory
.getLog(GenericUDFDBOutput.class.getName());
private transient ObjectInspector[] argumentOI;
private transient Connection connection = null;
private String url;
private String user;
private String pass;
private final IntWritable result = new IntWritable(-1);
/**
* @param arguments
* argument 0 is the JDBC connection string argument 1 is the user
* name argument 2 is the password argument 3 is an SQL query to be
* used in the PreparedStatement argument (4-n) The remaining
* arguments must be primitive and are passed to the
* PreparedStatement object
*/
@Override
public ObjectInspector initialize(ObjectInspector[] arguments)
throws UDFArgumentTypeException {
argumentOI = arguments;
// this should be connection url,username,password,query,column1[,columnn]*
for (int i = 0; i < 4; i++) {
if (arguments[i].getCategory() == ObjectInspector.Category.PRIMITIVE) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) arguments[i]);
if (!(poi.getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING)) {
throw new UDFArgumentTypeException(i,
"The argument of function should be \""
+ Constants.STRING_TYPE_NAME + "\", but \""
+ arguments[i].getTypeName() + "\" is found");
}
}
}
for (int i = 4; i < arguments.length; i++) {
if (arguments[i].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(i,
"The argument of function should be primative" + ", but \""
+ arguments[i].getTypeName() + "\" is found");
}
}
return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
}
/**
* @return 0 on success -1 on failure
*/
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
url = ((StringObjectInspector) argumentOI[0])
.getPrimitiveJavaObject(arguments[0].get());
user = ((StringObjectInspector) argumentOI[1])
.getPrimitiveJavaObject(arguments[1].get());
pass = ((StringObjectInspector) argumentOI[2])
.getPrimitiveJavaObject(arguments[2].get());
try {
connection = DriverManager.getConnection(url, user, pass);
} catch (SQLException ex) {
LOG.error("Driver loading or connection issue", ex);
result.set(2);
}
if (connection != null) {
try {
PreparedStatement ps = connection
.prepareStatement(((StringObjectInspector) argumentOI[3])
.getPrimitiveJavaObject(arguments[3].get()));
for (int i = 4; i < arguments.length; ++i) {
PrimitiveObjectInspector poi = ((PrimitiveObjectInspector) argumentOI[i]);
ps.setObject(i - 3, poi.getPrimitiveJavaObject(arguments[i].get()));
}
ps.execute();
ps.close();
result.set(0);
} catch (SQLException e) {
LOG.error("Underlying SQL exception", e);
result.set(1);
} finally {
try {
connection.close();
} catch (Exception ex) {
LOG.error("Underlying SQL exception during close", ex);
}
}
}
return result;
}
@Override
public String getDisplayString(String[] children) {
StringBuilder sb = new StringBuilder();
sb.append("dboutput(");
if (children.length > 0) {
sb.append(children[0]);
for (int i = 1; i < children.length; i++) {
sb.append(",");
sb.append(children[i]);
}
}
sb.append(")");
return sb.toString();
}
}
|