大数据计算技术大作业
项目目的
天气数据分析,通过网络爬虫(自己编写网络爬虫程序),网址:https://www.tianqi.com,点击“天气”->“历史天气”,获得天气数据,并将获得的天气数据存储到HDFS中,然后利用Map reduce和Hive,分析天气数据。
基本过程
Created with Rapha?l 2.3.0
开始
天气网爬取数据
数据保存在MySQL
Sqoop数据迁移
MapReduce和Hive离线数据分析
数据可视化
结束
实验步骤
爬虫程序
项目流程
Created with Rapha?l 2.3.0
开始
mainUI
是否进入数据库管理
databaseUI
是否进行数据库操作
数据库操作
是否执行爬虫
爬虫操作
结束
yes
no
yes
no
yes
no
项目目录
-
mainUI.py:提供初始界面 -
databaseUI.py:提供数据库操作界面 -
databaseOperate.py:提供操作数据库的具体方法
-
query_tables():查询数据库返回WeatherData架构内具体的表 -
query_tables_information(table_name):输入table_name(String),返回该数据表内的数据 -
delete_table_information(table_name):输入table_name(String),删除该表内数据,返回bool -
insert_information(table_name, data):输入table_name(String)、data(list),将数据插入指定数据表 -
reptile.py:爬虫主体,爬取city数据表内城市近一年的天气数据并存取到MySql内。 -
lib目录
-
output_file:输出sql文件,包括建表、数据导入文件
爬取结果
-
city数据库 -
airQuality数据库 -
weather数据库
注:这里只展示性地爬取了输入三个城市的数据,并只获取了自 2021-4-1 到 2022-3-31 共1095条数据
Sqoop安装及使用
安装zookeper
修改存储目录
dataDir=/usr/local/zookeeper/zkdata
运行zookeper
查看运行状态
安装sqoop
配置环境(sqoop-env.sh)
export HADOOP_COMMON_HOME=/usr/local/hadoop
export HADOOP_MAPRED_HOME=/usr/local/hadoop
export HIVE_HOME=/usr/local/hive
export ZOOKEEPER_HOME=/usr/local/zookeeper
export ZOOCFGDIR=/usr/local/zookeeper
export HBASE_HOME=/usr/local/hbase
添加依赖
sudo cp mysql-connector-java-8.0.29.jar /usr/local/sqoop1.4.6/lib/
验证Sqoop
测试链接数据库
bin/sqoop list-databases --connect jdbc:mysql://127.0.0.1:3306/ --username root --password 123456
浏览器查看hdfs配置
<property>
<name>dfs.http.address</name>
<value>0.0.0.0:50070</value>
</property>
Hadoop伪分布方式
hadoop/etc/hadoop/mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.admin.user.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=$HADOOP_COMMON_HOME</value>
</property>
</configuration>
hadoop/etc/hadoop/yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
启动服务
sbin/start-all.sh
导入数据
bin/sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/WeatherData \
--username root \
--password 123456 \
--table city \
--target-dir /user/WeatherData/city \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t"
bin/sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/WeatherData \
--username root \
--password 123456 \
--table weather \
--target-dir /user/WeatherData/weather \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t"
bin/sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/WeatherData \
--username root \
--password 123456 \
--table airQuality \
--target-dir /user/WeatherData/airQuality \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t"
下载其中的一个数据查看
编写、部署及运行 MapReduce
本地测试
环境准备
-
创建 maven 工程,MapReduceDemo -
在 pom.xml 文件中添加如下依赖 <dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
-
在项目的 src/main/resources 目录下,新建一个文件,命名为 log4j.properties,在文件中填入 1og4j.rootLogger=INFO, stdout
1og4j.appender.stdout=org.apache.1og4j.ConsoleAppender
1og4j.appender.stdout.layout=org.apache.1og4j.PatternLayout
1og4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
1og4j.appender.logfile=org.apache.1og4j.FileAppender
1og4j.appender.1ogfile.File=target/spring.log
1og4j.appender.1ogfile.layout=org.apache.1og4j.PatternLayout
1og4j.appender.1ogfile.1ayout.ConversionPattern=%d %p [%c] - %m%n
-
创建包名:com.owem.mapreduce.weatherAnalysis
编写程序
-
编写气温分析的 Bean 对象(程序参数、方法主体) public class WeatherAnalysisBean implements Writable {
private Date date;
private int minTemp;
private int maxTemp;
private float[] minTempArrays = new float[12];
private float[] maxTempArrays = new float[12];
private boolean[] weather = new boolean[24];
private int[] weatherArrays = new int[24];
public WeatherAnalysisBean() {
}
public String getDate() {
String s = (date.getYear()+1900) + "-" + (date.getMonth()+1) + "-" + date.getDate();
return s;
}
public void setDate(String s) throws ParseException {
this.date = new SimpleDateFormat("yyyy-MM-dd").parse(s);;
}
public int getMinTemp() {
return minTemp;
}
public void setMinTemp(int minTemp) {
this.minTemp = minTemp;
}
public int getMaxTemp() {
return maxTemp;
}
public void setMaxTemp(int maxTemp) {
this.maxTemp = maxTemp;
}
public String getWeather() {
String[] weatherList = {"阴", "晴", "多云", "霾", "小雨", "小雨转阴", "多云转小雨", "小雨到中雨", "小雨转多云", "中雨", "阴到小雨", "小雪", "中雪", "扬沙", "大雪", "雨夹雪","阴转小雨", "小雨到大雨", "晴转小雨", "风", "大雨", "中雨到大雨", "阴到中雨", "雾"};
int index;
for (index = 0; index < 24; index++) {
if (weather[index]) {
return weatherList[index];
}
}
return null;
}
public void setWeather(String s) {
for (int i = 0; i < 24; i++) {
weather[i] = false;
}
String[] weatherList = {"阴", "晴", "多云", "霾", "小雨", "小雨转阴", "多云转小雨", "小雨到中雨", "小雨转多云", "中雨", "阴到小雨", "小雪", "中雪", "扬沙", "大雪", "雨夹雪","阴转小雨", "小雨到大雨", "晴转小雨", "风", "大雨", "中雨到大雨", "阴到中雨", "雾"};
int index;
for (index = 0; index < 24; index++) {
if (weatherList[index].equals(s)) {
weather[index] = true;
break;
}
}
}
public void computeWeather() {
int index;
for (index = 0; index < 24; index++) {
if (weather[index]) {
weatherArrays[index]++;
break;
}
}
}
public void setTemp() {
int month = date.getMonth();
minTempArrays[month] += minTemp;
maxTempArrays[month] += maxTemp;
}
public void computeTemp() {
int[] days = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};
for (int i = 0; i < 12; i++) {
minTempArrays[i] /= days[i];
maxTempArrays[i] /= days[i];
}
}
public String getMinTempArrays() {
String result = "{";
int i;
for (i = 0; i < minTempArrays.length-1; i++) {
result = result + minTempArrays[i] + ",";
}
result = result + minTempArrays[i] + "}";
return result;
}
public String getMaxTempArrays() {
String result = "{";
int i;
for (i = 0; i < maxTempArrays.length-1; i++) {
result = result + maxTempArrays[i] + ",";
}
result = result + maxTempArrays[i] + "}";
return result;
}
public String getWeatherArrays() {
String result = "{";
int i;
for (i = 0; i < weatherArrays.length-1; i++) {
result = result + weatherArrays[i] + ",";
}
result = result + weatherArrays[i] + "}";
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(date.getTime());
out.writeInt(minTemp);
out.writeInt(maxTemp);
for (float v : minTempArrays) {
out.writeFloat(v);
}
for (float v : maxTempArrays) {
out.writeFloat(v);
}
for (boolean v : weather) {
out.writeBoolean(v);
}
}
@Override
public void readFields(DataInput in) throws IOException {
this.date = new Date(in.readLong());
this.minTemp = in.readInt();
this.maxTemp = in.readInt();
for (int i=0; i < 12; i++) {
this.minTempArrays[i] = in.readFloat();
}
for (int i=0; i < 12; i++) {
this.maxTempArrays[i] = in.readFloat();
}
for (int i=0; i < 24; i++) {
this.weather[i] = in.readBoolean();
}
}
@Override
public String toString() {
return getMinTempArrays() + "\t" + getMaxTempArrays() + "\t" + getWeatherArrays() ;
}
}
-
编写 mapper 对象 public class WeatherAnalysisMapper extends Mapper<LongWritable, Text, Text, WeatherAnalysisBean> {
private Text outK = new Text();
private WeatherAnalysisBean outV = new WeatherAnalysisBean();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WeatherAnalysisBean>.Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split("\t");
String cityId = split[0];
String date = split[1];
String maxTemp = split[2];
String minTemp = split[3];
String weather = split[4];
outK.set(cityId);
try {
outV.setDate(date);
} catch (ParseException e) {
throw new RuntimeException(e);
}
outV.setMaxTemp((int) Float.parseFloat(maxTemp));
outV.setMinTemp((int) Float.parseFloat(minTemp));
outV.setWeather(weather);
context.write(outK, outV);
}
}
-
编写 reducer 对象 public class WeatherAnalysisReducer extends Reducer<Text, WeatherAnalysisBean, Text, WeatherAnalysisBean> {
private WeatherAnalysisBean outV = new WeatherAnalysisBean();
@Override
protected void reduce(Text key, Iterable<WeatherAnalysisBean> values, Reducer<Text, WeatherAnalysisBean, Text, WeatherAnalysisBean>.Context context) throws IOException, InterruptedException {
for (WeatherAnalysisBean value : values) {
try {
outV.setDate(value.getDate());
outV.setMaxTemp(value.getMaxTemp());
outV.setMinTemp(value.getMinTemp());
outV.setWeather(value.getWeather());
outV.computeWeather();
outV.setTemp();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
outV.computeTemp();
context.write(key, outV);
}
}
-
编写 driver 对象 public class WeatherAnalysisDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WeatherAnalysisDriver.class);
job.setMapperClass(WeatherAnalysisMapper.class);
job.setReducerClass(WeatherAnalysisReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(WeatherAnalysisBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(WeatherAnalysisBean.class);
FileInputFormat.setInputPaths(job, new Path("/Users/owem/Desktop/Profession/Java/MapReduceDemo/input/WeatherData/weather"));
FileOutputFormat.setOutputPath(job, new Path("/Users/owem/Desktop/Profession/Java/MapReduceDemo/output/WeatherData"));
job.waitForCompletion(true);
}
}
本地运行
运行 WeatherAnalysisDriver
生成相关文件
查看结果:part-r-00000
注:输出结果为:cityId [每个月平均最低温度] [每个月平均最高温度] [天气状态分部]
用 maven 打 jar 包,需要添加的打包插件依赖
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
部署测试
-
修改 driver 对象的输入输出部分 FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
Maven 打 jar 包 生成两个 jar 包,大的包含依赖,小的不包含(一般用这个) -
将 jar 包上传到 linux,同时保证 hdfs 有待分析的文件
- 将 jar 包重命名为 WDA.jar 上传到 hadoop 目录下
- 删除 HDFS 上 weather 文件夹内 _SUCCESS
-
执行 jar 包 hadoop jar WDA.jar com.owem.mapreduce.weatherAnalysis.WeatherAnalysisDriver /user/WeatherData/weather /user/WeatherData/weatherOutput
数据导入Hive
启动 Hive
在确保已经启动 hadoop 集群的前提下启动 hive:
创建数据库并填入数据
创建数据库
切换到 WeatherData 数据库并创建数据表
数据表信息:
-
city 表 create table if not exists city
(
cityID int,
cityName varchar(20)
);
-
airQuality 表 create table if not exists airQuality
(
CityID int,
`date` date,
quality_level varchar(20),
AQI int,
PM2_5 int,
PM10 int,
So2 int,
No2 int,
Co float,
O3 int
);
-
weather 表 create table if not exists weather
(
cityID int,
weather varchar(20),
`date` date,
maxTemp int,
minTemp int
);
查看数据表:
将 MySQL 中数据导入 Hive 中
bin/sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/WeatherData \
--username root \
--password 123456 \
--table city -m 1 \
--hive-import \
--hive-table WeatherData.city
bin/sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/WeatherData \
--username root \
--password 123456 \
--table airQuality -m 1 \
--hive-import \
--hive-table WeatherData.airQuality
bin/sqoop import \
--connect jdbc:mysql://127.0.0.1:3306/WeatherData \
--username root \
--password 123456 \
--table weather -m 1 \
--hive-import \
--hive-table WeatherData.weather
缺失值处理
例如 airQuality 表内 data 不为空:
select count(1) from airQuality where 'data' is null;
类似地,可以知道本次导入数据正常,没有出现缺失值。
分析 Hive 内数据
统计每月晴天的天数
select cityName, month(`date`), count(1)
from city, weather
where city.cityID = weather.cityID and weather = '晴'
group by cityName, month(`date`);
统计空气质量的初步分析
每个月空气质量分部:
select month(`date`), quality_level, count(1)
from airQuality, city
where city.cityID = airQuality.cityID
group by month(`date`), quality_level
order by count(1) desc ;
可以看出,在一年中6、7、8、9、10月份普遍空气质量较好;在10、11、12、1、2、3空气质量会较差;
这点在对北京单独的查询结果中表现更为明显:
select month(`date`), quality_level, count(1)
from airQuality, city
where city.cityID = airQuality.cityID and city.cityID = 1
group by month(`date`), quality_level
order by count(1) desc ;
同时也注意到在10、11、12月南北方会有较大差异
不同城市每个月空气质量分部情况:
select cityName, month(`date`), quality_level, count(1)
from airQuality, city
where city.cityID = airQuality.cityID
group by cityName, month(`date`), quality_level;
查询雾霾天数
select cityName, count(1)
from city, weather
where city.cityID = weather.cityID and weather.weather like '%霾%'
group by cityName;
可以看到只有北京出现三天的雾霾
查询那一天的天气状况
select cityName, weather.`date`, weather.weather, AQI, PM2_5, maxTemp, minTemp
from city, weather, airQuality
where city.cityID = weather.cityID = airQuality.cityID and weather.weather like '%霾%' and weather.`date` = airQuality.`date`;
可以看到出现霾的日子空气质量不一定很差,推测仅仅是发生在早晨温度较低的时候,且随温度上升很快消失,所以整天的AQI反而不一定高
数据可视化
将数据导出为 Excel 文件进行分析
绘制 2021-4-1 至 2022-3-31 空气质量变化折线图
绘制 2021-4-1 至 2022-3-31 气温变化图
由 MapReduce 结果绘制 2021-4 至 2022-3 气温变化图
由 MapReduce 结果绘制 2021-4 至 2022-3 天气情况占比图
|