需求
- 统计集群服务器硬盘每日增量
- 统计HDFS每天数据增量
- 统计数仓每天数据增量
- 统计数仓ODS层和DWD层每天数据增量
硬盘资源监控命令
监控项 | 命令 | 默认单位 |
---|
某台服务器硬盘使用量 | df | KB | HDFS总使用量 | hadoop fs -df | B | HDFS上某个文件夹的大小 | hadoop fs -du -s / | B |
MySQL存储数据
建库
CREATE DATABASE data_management;
USE data_management;
建表
CREATE TABLE item_info(
item_id INT PRIMARY KEY COMMENT '监控项ID',
item_name VARCHAR(255) NOT NULL COMMENT '监控项名称',
item_unit VARCHAR(63) DEFAULT NULL COMMENT '计量单位'
)COMMENT='监控项信息表';
CREATE TABLE item_records(
item_id INT COMMENT '监控项ID',
monitoring_time DATETIME COMMENT '监测时间',
monitoring_value BIGINT COMMENT '监测值',
PRIMARY KEY(item_id,monitoring_time)
)COMMENT='监控项流水表';
插入数据
INSERT item_info VALUES
(1,'HDFS数据总使用量','B'),
(2,'数仓占用量(含副本)','B'),
(3,'数仓ODS层占用量(含副本)','B'),
(4,'数仓DWD层占用量(含副本)','B'),
(5,'hadoop105服务器硬盘使用','KB'),
(6,'hadoop106服务器硬盘使用','KB'),
(7,'hadoop107服务器硬盘使用','KB');
创建视图
CREATE OR REPLACE VIEW hdfs_used_ymd AS
SELECT
DATE(monitoring_time) AS ymd,
AVG(monitoring_value) AS b
FROM item_records
WHERE item_id=1 AND DATEDIFF(NOW(),monitoring_time)<31
GROUP BY DATE(monitoring_time);
CREATE OR REPLACE VIEW hdfs_ymd AS
SELECT
t1.ymd AS ymd,
t1.b AS b,
t1.b-t2.b AS increase
FROM hdfs_used_ymd t1
INNER JOIN hdfs_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY);
CREATE OR REPLACE VIEW df_used_hosts_ymd AS
SELECT
t1.item_name AS hostname,
DATE(t2.monitoring_time) AS ymd,
AVG(t2.monitoring_value) AS kb
FROM(
SELECT item_id,item_name FROM item_info
WHERE item_name LIKE '%服务器硬盘使用'
)t1 LEFT JOIN (
SELECT item_id,monitoring_time,monitoring_value
FROM item_records
WHERE DATEDIFF(NOW(),monitoring_time)<31
)t2 ON t1.item_id=t2.item_id
GROUP BY DATE(t2.monitoring_time),t1.item_id;
CREATE OR REPLACE VIEW df_used_ymd AS
SELECT ymd,SUM(kb)/1024/1024 AS gb
FROM df_used_hosts_ymd
GROUP BY ymd;
CREATE OR REPLACE VIEW df_ymd AS
SELECT
t1.ymd AS ymd,
t1.gb AS gb,
t1.gb-t2.gb AS increase
FROM df_used_ymd t1
INNER JOIN df_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY);
旧版MySQL视图不支持注释,因此创建一个元数据表
CREATE TABLE view_metadata(
metadata_name VARCHAR(255) PRIMARY KEY COMMENT '元数据名称',
explanation VARCHAR(255) NOT NULL COMMENT '元数据说明'
)COMMENT='视图的元数据信息';
INSERT view_metadata VALUES
('hdfs_ymd','最近30天,HDFS每天总使用量及日环比'),
('hdfs_ymd.increase','日环比'),
('df_ymd','最近30天,所有服务器硬盘总使用及变化'),
('df_ymd.increase','日环比');
Python3获取数据
from time import strftime
from subprocess import check_output
from re import split
from pymysql.connections import Connection
HDFS_PATH = {
2: '/user/hive/warehouse/ec.db',
3: '/user/hive/warehouse/ec.db/ods',
4: '/user/hive/warehouse/ec.db/dwd',
}
HOSTNAMES = {
5: 'hadoop105',
6: 'hadoop106',
7: 'hadoop107',
}
def _cmd(command) -> str:
"""指定用户执行Shell命令,并返回结果"""
return check_output('sudo -i -u yellow ' + command, shell=True).decode().strip()
def _mysql_formatting(a: int, b: str, c: int) -> dict:
"""数据写入MySQL的字段格式"""
return {'item_id': a, 'monitoring_time': b, 'monitoring_value': c}
def _now() -> str:
return strftime('%Y-%m-%d %H:%M:%S')
def hadoop_fs_df() -> dict:
"""查询HDFS数据量"""
result = _cmd('hadoop fs -df')
now = _now()
dt = {k: v for k, v in zip(*(split(r'\s+', line) for line in result.split('\n')))}
print(dt)
return _mysql_formatting(1, now, int(dt['Used']))
def hadoop_fs_du(item_id=2) -> dict:
"""查询HDFS文件夹大小"""
result = _cmd('hadoop fs -du -s ' + HDFS_PATH[item_id])
now = _now()
print(result)
value = split(r'\s+', result)[1]
return _mysql_formatting(item_id, now, int(value))
def hadoop_fs_du_ods() -> dict:
"""查询ODS层文件夹大小"""
return hadoop_fs_du(3)
def hadoop_fs_du_dwd() -> dict:
"""查询DWD层文件夹大小"""
return hadoop_fs_du(4)
def df(item_id) -> dict:
"""查询指定服务器硬盘使用,执行用户具有sudo权限,集群之间root用户可免密登录"""
hostname = HOSTNAMES[item_id]
cmd = 'sudo ssh {} df /'.format(hostname)
print(cmd)
result = _cmd(cmd)
now = _now()
dt = {k: v for k, v in zip(*(split(r'\s+', line) for line in result.split('\n')))}
print(dt)
return _mysql_formatting(item_id, now, int(dt['Used']))
def df_hadoop105() -> dict:
"""查询hadoop105服务器硬盘使用"""
return df(5)
def df_hadoop106() -> dict:
"""查询hadoop106服务器硬盘使用"""
return df(6)
def df_hadoop107() -> dict:
"""查询hadoop107服务器硬盘使用"""
return df(7)
class Mysql:
def __init__(self, host, database, password, user='root'):
self.db = Connection(
user=user,
password=password,
host=host,
database=database,
port=3306,
charset='UTF8')
self.cursor = self.db.cursor()
def __del__(self):
self.cursor.close()
self.db.close()
def commit(self, sql):
print(sql)
try:
self.cursor.execute(sql)
self.db.commit()
except Exception as e:
print(e)
def insert(self, dt, tb):
ls = [(k, v) for k, v in dt.items() if v is not None]
sql = 'INSERT %s (' % tb + ','.join(i[0] for i in ls) + \
') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')'
self.commit(sql)
if __name__ == '__main__':
db = Mysql(
host='hadoop105',
password='密码',
database='data_management')
TABLE = 'item_records'
db.insert(hadoop_fs_df(), TABLE)
db.insert(hadoop_fs_du(), TABLE)
db.insert(hadoop_fs_du_ods(), TABLE)
db.insert(hadoop_fs_du_dwd(), TABLE)
db.insert(df_hadoop105(), TABLE)
db.insert(df_hadoop106(), TABLE)
db.insert(df_hadoop107(), TABLE)
定时任务
crontab -e
0 0 * * * /home/miniconda/miniconda3/bin/python /home/miniconda/df.py
或使用DolphinScheduler
|