一、数据采集
[root@hadoop01 conf]# cd /home/software/flume/conf/
[root@hadoop01 conf]# cp demo.conf yiqing.conf
[root@hadoop01 conf]# vim yiqing.conf
#定义agent
a1.sources=r1
a1.channels=c1
a1.sinks=k1
#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/home/project/epidemic
#定义channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#定义sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://hadoop01:9000/flume
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.rollCount=30
#绑定agent
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
二、对数据进行操作(使用hive)
create database yiqing
create external table bingren (id int,name string,xiaoqu string,checi string,zuowei string ,quezhen int,riqi date) row format delimited fields terminated by ' ' location '/epidemic/bingren';
create external table dache (name string,chepaihao string,qidian string,zhongdian string ,renshu int,riqi date) row format delimited fields terminated by ' ' location '/epidemic/dache';
create external table hangban (name string,hangbanhao string,qidian string,zhongdian string ,renshu int,riqi date) row format delimited fields terminated by ' ' location '/epidemic/hangban';
create external table tielu (name string,checihao string,qidian string,zhongdian string ,zuowei string,renshu int,riqi date) row format delimited fields terminated by ' ' location '/epidemic/tielu';
create table hangban2 (name string,renshu int, riqi string) partitioned by (chuxing string)
INSERT overwrite table hangban2 PARTITION(chuxing='tielu')
select checihao as name,renshu,riqi from tielu
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT overwrite table hangban2 PARTITION(chuxing='hangban')
select hangbanhao as name,renshu,riqi from hangban
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT overwrite table hangban2 PARTITION(chuxing='dache')
select chepaihao as name,renshu,riqi from dache
WHERE name in (
select name from bingren
where quezhen=1
);
SELECT SUM(renshu) from hangban2
create TABLE ganranquyu(quyu string) partitioned by (leixing string)
INSERT INTO table ganranquyu PARTITION(leixing='chezhanqu')
select qidian from dache
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT INTO table ganranquyu PARTITION(leixing='shengfen')
select qidian from tielu
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT INTO table ganranquyu PARTITION(leixing='shengfen')
select qidian from hangban
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT INTO table ganranquyu PARTITION(leixing='chezhanqu')
select zhongdian from dache
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT INTO table ganranquyu PARTITION(leixing='shengfen')
select zhongdian from tielu
WHERE name in (
select name from bingren
where quezhen=1
);
INSERT INTO table ganranquyu PARTITION(leixing='shengfen')
select zhongdian from hangban
WHERE name in (
select name from bingren
where quezhen=1
);
SELECT DISTINCT quyu FROM ganranquyu
三、对数据进行操作(使用 eclipse)
package cn.edu.epidemic;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.junit.Test;
public class EpidemicTest {
@Test
public void testCreateDatabase() throws ClassNotFoundException,SQLException{
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection( "jdbc:hive2://192.168.232.129:10000","root","root");
Statement stat = conn.createStatement();
try {
String sql = "USE epidemic";
stat.execute(sql);
} catch (Exception e) {
stat.executeUpdate("create database epidemic");
}
stat.close();
conn.close();
}
@Test
public void testCreateTable() throws ClassNotFoundException,SQLException{
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection( "jdbc:hive2://192.168.232.129:10000/epidemic","root","root");
Statement stat = conn.createStatement();
try {
ResultSet rs = stat.executeQuery( "select * from bingren");
} catch (Exception e) {
stat.executeUpdate( "create external table bingren (id int,name string,xiaoqu string,checi string,"
+ "zuowei string ,quezhen int,riqi date) row format delimited fields terminated "
+ "by ' ' location '/epidemic/bingren'");
}
try {
ResultSet rs = stat.executeQuery( "select * from dache");
} catch (Exception e) {
stat.executeUpdate( "create external table dache (name string,chepaihao string,qidian string,"
+ "zhongdian string ,renshu int,riqi date) row format delimited fields terminated "
+ "by ' ' location '/epidemic/dache'");
}
try {
ResultSet rs = stat.executeQuery( "select * from hangban");
} catch (Exception e) {
stat.executeUpdate( "create external table hangban (name string,hangbanhao string,qidian string,"
+ "zhongdian string ,renshu int,riqi date) row format delimited fields terminated "
+ "by ' ' location '/epidemic/hangban'");
}
try {
ResultSet rs = stat.executeQuery( "select * from tielu");
} catch (Exception e) {
stat.executeUpdate( "create external table tielu (name string,checihao string,qidian string,"
+ "zhongdian string ,zuowei string,renshu int,riqi date) row format delimited fields "
+ "terminated by ' ' location '/epidemic/tielu'");
}
try {
ResultSet rs = stat.executeQuery( "select * from hangban2");
} catch (Exception e) {
stat.executeUpdate( "create table hangban2 (name string,renshu int, riqi string) partitioned "
+ "by (chuxing string)");
}
try {
ResultSet rs = stat.executeQuery( "select * from ganranquyu");
} catch (Exception e) {
stat.executeUpdate( "create TABLE ganranquyu(quyu string) partitioned by (leixing string) ");
}
stat.close();
conn.close();
}
@Test
public void testInsertHangban2() throws ClassNotFoundException,SQLException{
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection( "jdbc:hive2://192.168.232.129:10000/epidemic","root","root");
Statement stat = conn.createStatement();
stat.executeUpdate( "INSERT overwrite table hangban2 PARTITION(chuxing='tielu') "
+ "select checihao as name,renshu,riqi from tielu "
+ "WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT overwrite table hangban2 PARTITION(chuxing='hangban') "
+ "select hangbanhao as name,renshu,riqi from hangban "
+ "WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT overwrite table hangban2 PARTITION(chuxing='dache') "
+ "select chepaihao as name,renshu,riqi from dache "
+ "WHERE name in (select name from bingren where quezhen=1)");
stat.close();
conn.close();
}
@Test
public void testInsertGanranquyu() throws ClassNotFoundException,SQLException{
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection( "jdbc:hive2://192.168.232.129:10000/epidemic","root","root");
Statement stat = conn.createStatement();
stat.executeUpdate( "INSERT INTO table ganranquyu PARTITION(leixing='chezhanqu') "
+ "select qidian from dache WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT INTO table ganranquyu PARTITION(leixing='shengfen') "
+ "select qidian from tielu WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT INTO table ganranquyu PARTITION(leixing='shengfen') "
+ "select qidian from hangban WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT INTO table ganranquyu PARTITION(leixing='shengfen') "
+ "select zhongdian from tielu WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT INTO table ganranquyu PARTITION(leixing='shengfen') "
+ "select zhongdian from hangban WHERE name in (select name from bingren where quezhen=1)");
stat.executeUpdate( "INSERT INTO table ganranquyu PARTITION(leixing='chezhanqu') "
+ "select zhongdian from dache WHERE name in (select name from bingren where quezhen=1)");
stat.close();
conn.close();
}
@Test
public void testResultHangban2() throws ClassNotFoundException,SQLException{
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection( "jdbc:hive2://192.168.232.129:10000/epidemic","root","root");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery( "select renshu from hangban2");
int sum=0;
while(rs.next()){
sum += rs.getInt("renshu");
}
System.out.println("共有人数: "+sum);
stat.close();
conn.close();
}
@Test
public void testResultGanranquyu() throws ClassNotFoundException,SQLException{
Class.forName("org.apache.hive.jdbc.HiveDriver");
Connection conn = DriverManager.getConnection( "jdbc:hive2://192.168.232.129:10000/epidemic","root","root");
Statement stat = conn.createStatement();
ResultSet rs = stat.executeQuery( "SELECT DISTINCT quyu FROM ganranquyu");
while(rs.next()){
System.out.println(rs.getString("quyu"));
}
stat.close();
conn.close();
}
}
四、出现的问题及解决方法
(一)FLUME采集数据到HDFS,文件内容出现乱码
仔细检查有没有下面这条语句,如果有是否正确 a1.sinks.k1.hdfs.fileType=DataStream
|