IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 大数据集群硬盘资源监控 -> 正文阅读

[大数据]大数据集群硬盘资源监控

需求

  1. 统计集群服务器硬盘每日增量
  2. 统计HDFS每天数据增量
  3. 统计数仓每天数据增量
  4. 统计数仓ODS层和DWD层每天数据增量

硬盘资源监控命令

监控项命令默认单位
某台服务器硬盘使用量dfKB
HDFS总使用量hadoop fs -dfB
HDFS上某个文件夹的大小hadoop fs -du -s /B

MySQL存储数据

建库

CREATE DATABASE data_management;
USE data_management;

建表

CREATE TABLE item_info(
  item_id    INT           PRIMARY KEY   COMMENT '监控项ID',
  item_name  VARCHAR(255)  NOT NULL      COMMENT '监控项名称',
  item_unit  VARCHAR(63)   DEFAULT NULL  COMMENT '计量单位'
)COMMENT='监控项信息表';
CREATE TABLE item_records(
  item_id           INT       COMMENT '监控项ID',
  monitoring_time   DATETIME  COMMENT '监测时间',
  monitoring_value  BIGINT    COMMENT '监测值',
  PRIMARY KEY(item_id,monitoring_time)
)COMMENT='监控项流水表';

插入数据

INSERT item_info VALUES
(1,'HDFS数据总使用量','B'),
(2,'数仓占用量(含副本)','B'),
(3,'数仓ODS层占用量(含副本)','B'),
(4,'数仓DWD层占用量(含副本)','B'),
(5,'hadoop105服务器硬盘使用','KB'),
(6,'hadoop106服务器硬盘使用','KB'),
(7,'hadoop107服务器硬盘使用','KB');

创建视图

-- 最近30天,HDFS每天总使用量(平均值)
CREATE OR REPLACE VIEW hdfs_used_ymd AS
SELECT
    DATE(monitoring_time) AS ymd,
    AVG(monitoring_value) AS b
FROM item_records
WHERE item_id=1 AND DATEDIFF(NOW(),monitoring_time)<31
GROUP BY DATE(monitoring_time);
-- 计算每天变化量
CREATE OR REPLACE VIEW hdfs_ymd AS
SELECT
    t1.ymd AS ymd,
    t1.b AS b,
    t1.b-t2.b AS increase
FROM hdfs_used_ymd t1
INNER JOIN hdfs_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY);
-- 最近30天各服务器硬盘使用(平均值)
CREATE OR REPLACE VIEW df_used_hosts_ymd AS
SELECT
    t1.item_name AS hostname,
    DATE(t2.monitoring_time) AS ymd,
    AVG(t2.monitoring_value) AS kb
FROM(
    SELECT item_id,item_name FROM item_info
    WHERE item_name LIKE '%服务器硬盘使用'
)t1 LEFT JOIN (
    -- 筛选最近30天
    SELECT item_id,monitoring_time,monitoring_value
    FROM item_records
    WHERE DATEDIFF(NOW(),monitoring_time)<31
)t2 ON t1.item_id=t2.item_id
GROUP BY DATE(t2.monitoring_time),t1.item_id;
-- 最近30天所有服务器硬盘使用总和
CREATE OR REPLACE VIEW df_used_ymd AS
SELECT ymd,SUM(kb)/1024/1024 AS gb
FROM df_used_hosts_ymd
GROUP BY ymd;
-- 最近30天所有服务器硬盘总使用及变化
CREATE OR REPLACE VIEW df_ymd AS
SELECT
    t1.ymd AS ymd,
    t1.gb AS gb,
    t1.gb-t2.gb AS increase
FROM df_used_ymd t1
INNER JOIN df_used_ymd t2
ON t1.ymd=DATE_ADD(t2.ymd,INTERVAL 1 DAY);

旧版MySQL视图不支持注释,因此创建一个元数据表

CREATE TABLE view_metadata(
  metadata_name  VARCHAR(255)  PRIMARY KEY  COMMENT '元数据名称',
  explanation    VARCHAR(255)  NOT NULL     COMMENT '元数据说明'
)COMMENT='视图的元数据信息';
-- 创建视图后,写入视图说明
INSERT view_metadata VALUES
('hdfs_ymd','最近30天,HDFS每天总使用量及日环比'),
('hdfs_ymd.increase','日环比'),
('df_ymd','最近30天,所有服务器硬盘总使用及变化'),
('df_ymd.increase','日环比');

Python3获取数据

from time import strftime
from subprocess import check_output
from re import split
from pymysql.connections import Connection  # conda install pymysql

# 要监控的HDFS文件夹,Key对应MySQL的data_management.item_info的item_id
HDFS_PATH = {
    2: '/user/hive/warehouse/ec.db',
    3: '/user/hive/warehouse/ec.db/ods',
    4: '/user/hive/warehouse/ec.db/dwd',
}
# 要监控的节点硬盘,Key对应MySQL的data_management.item_info的item_id
HOSTNAMES = {
    5: 'hadoop105',
    6: 'hadoop106',
    7: 'hadoop107',
}


def _cmd(command) -> str:
    """指定用户执行Shell命令,并返回结果"""
    return check_output('sudo -i -u yellow ' + command, shell=True).decode().strip()


def _mysql_formatting(a: int, b: str, c: int) -> dict:
    """数据写入MySQL的字段格式"""
    return {'item_id': a, 'monitoring_time': b, 'monitoring_value': c}


def _now() -> str:
    return strftime('%Y-%m-%d %H:%M:%S')


def hadoop_fs_df() -> dict:
    """查询HDFS数据量"""
    result = _cmd('hadoop fs -df')
    now = _now()
    dt = {k: v for k, v in zip(*(split(r'\s+', line) for line in result.split('\n')))}
    print(dt)
    return _mysql_formatting(1, now, int(dt['Used']))


def hadoop_fs_du(item_id=2) -> dict:
    """查询HDFS文件夹大小"""
    result = _cmd('hadoop fs -du -s ' + HDFS_PATH[item_id])
    now = _now()
    print(result)
    value = split(r'\s+', result)[1]
    return _mysql_formatting(item_id, now, int(value))


def hadoop_fs_du_ods() -> dict:
    """查询ODS层文件夹大小"""
    return hadoop_fs_du(3)


def hadoop_fs_du_dwd() -> dict:
    """查询DWD层文件夹大小"""
    return hadoop_fs_du(4)


def df(item_id) -> dict:
    """查询指定服务器硬盘使用,执行用户具有sudo权限,集群之间root用户可免密登录"""
    hostname = HOSTNAMES[item_id]
    cmd = 'sudo ssh {} df /'.format(hostname)
    print(cmd)
    result = _cmd(cmd)
    now = _now()
    dt = {k: v for k, v in zip(*(split(r'\s+', line) for line in result.split('\n')))}
    print(dt)
    return _mysql_formatting(item_id, now, int(dt['Used']))


def df_hadoop105() -> dict:
    """查询hadoop105服务器硬盘使用"""
    return df(5)


def df_hadoop106() -> dict:
    """查询hadoop106服务器硬盘使用"""
    return df(6)


def df_hadoop107() -> dict:
    """查询hadoop107服务器硬盘使用"""
    return df(7)


class Mysql:
    def __init__(self, host, database, password, user='root'):
        self.db = Connection(
            user=user,
            password=password,
            host=host,
            database=database,
            port=3306,
            charset='UTF8')
        self.cursor = self.db.cursor()

    def __del__(self):
        self.cursor.close()
        self.db.close()

    def commit(self, sql):
        print(sql)
        try:
            self.cursor.execute(sql)
            self.db.commit()
        except Exception as e:
            print(e)

    def insert(self, dt, tb):
        ls = [(k, v) for k, v in dt.items() if v is not None]
        sql = 'INSERT %s (' % tb + ','.join(i[0] for i in ls) + \
              ') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')'
        self.commit(sql)


if __name__ == '__main__':
    db = Mysql(
        host='hadoop105',
        password='密码',
        database='data_management')
    TABLE = 'item_records'
    db.insert(hadoop_fs_df(), TABLE)
    db.insert(hadoop_fs_du(), TABLE)
    db.insert(hadoop_fs_du_ods(), TABLE)
    db.insert(hadoop_fs_du_dwd(), TABLE)
    db.insert(df_hadoop105(), TABLE)
    db.insert(df_hadoop106(), TABLE)
    db.insert(df_hadoop107(), TABLE)

定时任务

crontab -e
# 每天跑一次
0 0 * * * /home/miniconda/miniconda3/bin/python /home/miniconda/df.py

或使用DolphinScheduler

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-06 13:08:12  更:2022-03-06 13:10:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:33:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码