IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> KAFKA-HDFS q分析数据保障机制 -> 正文阅读

[大数据]KAFKA-HDFS q分析数据保障机制

项目背景

kudu集群的崩溃会造成qlibra数据的丢失,为了保障qlibra数据的准确性,我们建立了一套qlibra的数据保障机制。

项目技术细节

kafka-hdfs消费程序

在原本的方案中,实时数据全部由kafka-kudu的spark消费程序负责消费,完全依赖于kudu的可靠性。因此,我们决定增加kafka-hdfs的数据消费线路,将数据备份至hdfs大集群中。当出现数据接入异常时,使用备份数据还原丢失的数据。

kafka-hdfs数据消费程序使用多线程的方式,为每个kafka partition创建一个独立的消费线程,每消费完成一条数据后手动提交偏移量以确保数据不丢失。

消费逻辑为:每个线程各自为所有项目按小时创建本地临时文件,接到kafka数据后,将数据写入对应的临时文件中。

程序还会启动一个同步线程负责定期同步指定路径下的临时文件。除了当前时刻的临时文件外,所有其他的数据文件都会同步到hdfs大集群中的指定目录:"hdfs://qunarcluster/user/flightdev/hive/warehouse/qlibra.db/${qlibra项目名称}/${date:yyyymmdd}/"。

临时文件中,各个字段使用\t进行分隔,因此可以使用hive挂载外表对备份数据进行查询。

目前程序已经修改为分布式程序,可通过下面脚本启停服务:

  • 启动脚本:?/home/q/tools/devbin/qlibra_kafka2hdfs_healthcheck.sh
  • 停服务脚本:?/home/q/tools/devbin/qlibra_kafka2hdfs_before_deploy.sh

目前节点间依赖redis作为分布式状态存储,各节点会从redis中取出自己消费的分区标识,在正常结束后,会将正在消费的分区标识放回redis中。

如果以初始化模式启动,会重建redis中的状态,因此初始化启动必须在所有节点都已下线后才能进行。

启动脚本支持参数:

参数名称

参数含义

样例

env环境标识prod
speed消费速率300
nodes服务节点数量4
init是否以初始化模式重启服务false
topic消费的kafka主题custom_flight_f_data_sensor_sink
group消费kafka的分组spark2hdfsd
hdfsDb默认存放数据的hive库qlibra
hdfsSitePathhdfds-site.xml在节点的路径/home/q/hadoop-2.2.0/etc/hadoop/hdfs-site.xml
hdfsRootPath文件上传的hdfs根路径

/user/flightdev/hive/warehouse/

但可用性较差,节点下线或新增节点需人工干预

  • 正常节点下线(如搬迁机柜,机器重启)
    1. crontab下线
    2. 调用停服务脚本,等待脚本执行完毕
    3. 机器恢复可用
    4. crontab上线
  • 异常节点下线
    1. 所有节点crontab下线
    2. 所有节点调用停服务脚本,等待脚本执行完毕
    3. 挑选一台可用节点以初始化模式重启服务
    4. 等待服务启动成功
    5. 剩余可用节点上线crontab脚本,逐台等待服务启动成功后才可操作下一台
  • 新增节点
    1. 所有旧节点逐台执行“crontab下线-服务下线-调整crontab的nodes参数-服务启动”步骤,一台一台操作
    2. 新节点安装hive环境,手动创建程序路径/home/q/onlinejars/tmp/qlibra/以及父路径,申请机器访问hdfs的账号权限
    3. 使用portal在新节点中部署程序
    4. 新节点配置相同crontab
  • 减少节点
    1. 所有节点逐台执行“crontab下线-服务下线”步骤,一台一台操作
    2. 所有待留存节点逐台执行“调整crontab的nodes参数-服务启动”步骤,一台一台操作

后续可考虑引入master角色进行自动恢复,或变更技术方案,将程序更换成web工程依托spring的kafka框架进行可用性保障。

hdfs小文件处理

由于对每个项目来说,每一小时均有多个线程为其创建本地临时文件(目前线上的kafka partition数量为18个,也就是说每个小时一个项目下都会有18个消费线程创建的本地临时文件)。这一方案会导致大量的小文件问题。

目前,已经挂载了小文件定时清理脚本,将指定项目对应hdfs目录中的小文件进行打包压缩后重新上传。这一脚本已经挂载在了调度系统中,每天合并两天前产生的数据文件。

脚本路径为:“l-dw2.f.cn5 /home/q/script/qlibra/merge_hdfs_small_file.sh”

hdfs-kudu数据恢复(废除)

如上文所提,hdfs大集群中的数据文件可以通过挂载hive分区外表的形式进行查询。因此,我们可以通过impala对hive表和kudu表的SQL操作支持,还原指定时间段内的数据。

目前已经完成了相关的数据还原脚本,并且在调度系统中为一些重要的qlibra项目建立了无效的数据恢复任务,当有需要时,可以启动任务进行恢复。脚本支持天和小时级别的数据还原。

脚本的逻辑如下:

    1. 在小集群中建立存放待恢复数据的临时表
    2. 将备份数据文件从大集群hdfs路径中拷贝到小集群中
    3. 将kudu中指定时间段的数据删除
    4. 将小集群hive中的数据插入kudu表中
    5. 删除小集群临时表

实际在使用时,发现如果还原的数据量过多(经验值为500万条以上),会给kudu集群造成较大的写入压力,从而将某个节点压死,引发集群崩溃。

因此脚本支持指定分钟片段,将数据划分为多批进行删除和插入操作。但是这一做法会产生额外的数据扫描操作,影响数据恢复的效率,使用时需要进行斟酌判断。

脚本路径为:“l-dw2.f.cn5 /home/q/script/qlibra/restore_hdfs_data_to_kudu.py”

报警处理

收到报警?

http://watcher.corp.qunar.com/dashboard/alarm/search?alarmName=kafka_to_hdfs_lag&alarmPath=qunar.team.flight.bigdata.qlibra&collapse=false&from=now-3h&to=now

查看具体报警分区

如果发生堆积报警,先查看一下各个分区的堆积情况,链接在下方

http://watcher.corp.qunar.com/dashboard/team/?path=qunar.team.flight.bigdata.qmark.kafka2hdfs&from=now-6h&to=now

http://watcher.corp.qunar.com/dashboard/apps/metric?currentPage=1&metric=custom_flight_f_data_sensor_sink.spark2hdfs.*&group=all_metrics&env=prod&path=qunar.corp.data.prism.ops_prism_monitor&nodeType=app_code

找到对应服务器

需要找到对应的机器,重启消费任务,并观察消费日志是否正常

机器列表在http://portal.corp.qunar.com/apptree/my-appcode/host?app_code=f_qlibra_kafka_to_hdfs&appcode_filter=f_qlibra_kafka_to_hdfs

查看错误日志

sudo crontab -l 可以看到定时任务的情况

*/2 * * * * sudo /home/q/tools/devbin/qlibra_kafka2hdfs_healthcheck.sh 开头的任务需要我们关注

重启服务

找到失败的任务所在的服务器,ps aux|grep hdfs,查看pid

sudo kill 指定pid? (注意:不能kill -9,因为会有钩子函数进行任务停止的状态恢复操作,强杀的话,会导致没有办法自动恢复任务;)

kill之后,crontab在2分钟左右会自动重启任务

重启后检查消费情况

重启之后,查看grep "待消费分区" /home/q/www/kafka2hdfs/logs/kafka2hdfs.2021-03-22.log? 检查这台机器消费的分区情况

?/home/q/www/kafka2hdfs/logs]$ tailf kafka2hdfs.2021-03-22.log

检查是否正常运行,能否正常消费数据。标识:正常获取元数据,并进行了数据的写入

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-17 12:41:07  更:2022-10-17 12:43:44 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/17 22:44:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码