一、背景需求介绍
实时监控系统需要满足对多种来源的数据进行告警,为提升系统的可扩展行和灵活性,采用动态规则配置来实现多种数据源、多种告警规则的实时告警
1、数据来源
车机应用端数据源:
数据源内容 | host | port | topic |
---|
Sentry 中的异常、奔溃 | tsp3dev07.para.com | 9092 | events | 车机端埋点数据 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | data-monitor-ivi |
云端数据源:
数据源内容 | host | port | topic |
---|
云端埋点数据 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | cloud_cp_data | 云端微服务日志 | tsp3dev01.para.com,tsp3dev01.para.com,tsp3dev03.para.com | 9092 | data-monitor-cloud |
二、系统架构设计
1、系统分层架构设计

本着高内聚低耦合的原则,实时告警系统采用分层设计的思想对整体的功能模块进行组合,其中:
1、Flink DataStream 层的功能是数据流在Flink内部的整体流向DAG图,如addSource 、connect 、process 、addSink ;
2、Flink Function 层的功能是对function的具体实现,如AlertManagerSinkFunction 、CustomMysqlSourceFunction 、RuleMatchBroadCastProcessFunction 等;
3、Service 层是业务的处理过程,如负责向AlertManager 传输数据的AlertManagerService 、负责规则同步、更新、维护、转化、匹配的 RulesService 。
2、业务模块设计

说明:业务上,需要告警的数据源目前有4中数据来源,分别是远端日志、云端微服务日志、车机端埋点、Sentry异常奔溃,其中Sentry 中的数据需要通过告警规则的筛选后发送到kafka中用于实时监控。设计上首先通过Driver中的class 路由到通用JSON告警模块或者Sentry异常奔溃业务处理模块,其次通过app.type 选择kafka中的数据源。
3、Flink DataStream 处理流程图
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7VDDELwQ-1627199664538)(images/Flink DataStream 处理流程图.png)]](https://img-blog.csdnimg.cn/b9f3ff1f84e245cd8a1063147792d353.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3poYW5namlhbl9lbmc=,size_16,color_FFFFFF,t_70)
说明:DataStream 处理流程图展示的是数据从Kafka消费后再Flink Function 中的流向关系,Driver 负责Flink程序的启动,通过class筛选路由到通用JSON告警或者Sentry异常崩溃模块,其中内部的逻辑比较相似:
1、首先Mysql中的配置通过自定义数据源模块会被解析成配置流;
2、其次kafka topic 会被解析成数据流,通过广播连接,配置流会被广播到每个数据流的TaskManager;
3、通过规则匹配模块对数据流和规则流进行匹配;
4、匹配到数据筛选出非Sentry中的数据分别发送到AlertManager实时告警、MySQL告警统计、kafka 实时监控
4、规则匹配模块设计

规则匹配模块核心使用的是Avaitor规则引擎表达式进行规则匹配,匹配的内容来源于:
1、数据流的JSON通过flattenAsMap转成map;
2、规则流中有效的Rule中获取得到的规则表达式。
5、规则设计
规则存储在MySQL中便于管理和修改,表结构如下:
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CgpZeA6H-1627199664546)(images/规则表结构.png)]](https://img-blog.csdnimg.cn/d1f8c2206d904778a11a85a8fe543c43.png)
其中,各字段解释如下:
id:为唯一的规则id,一个程序中id被认为是同一条规则;
name:规则的名称,用于规则的解释和告警输出;
exp:规则表达式,用于和数据流匹配;
update_time:规则变化时的更新时间;
create_time:规则的创建时间;
is_valid:是否有效规则,匹配的时候只使用有效的规则;
app_type:应用类型,只有当数据流中的ap_type和程序启动时的app.type 相同时才有可能匹配成功。
建表语句为:
CREATE TABLE IF NOT EXISTS `flink`.`flink-alert-rule123` (
`id` int(16) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`name` varchar(255) DEFAULT NULL COMMENT '规则名称',
`exp` varchar(1020) DEFAULT NULL COMMENT '规则表达式',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新规则时间',
`create_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '创建规则时间',
`is_valid` int(1) NOT NULL COMMENT '规则是否有效,无效不会告警',
`app_type` varchar(255) NOT NULL COMMENT '规则适用的应用类型,值必须是AppType的枚举值,忽略大小写',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
6、规则表达说明
1、规则表中的exp字段用来存储规则表达式,用aviator表达式引擎执行JSON是否满足表达式,返回boolean值,
2、规则的最简单格式为:key 比较运算符 value
key 是要匹配的JSON中的全路径key,不支持特殊字符,支持. 比较运算符 可以是 == 、!= 、< 、> 等参考:https://www.yuque.com/boyan-avfmj/aviatorscript/lvabnwvalue 是数字或者字符串,包含特殊字符用'' 包裹
3、规则的例子:
{
"common_data":{
"appPackage":"ltd.qisi.sotasupportapp",
"appVersion":"3.03.01.000",
"collectedTime":1625240289781,
"behaviorId":"50026003",
"qisiuiVersion":"0.2.02",
"uid":"1924427992",
"regionCode":"659001",
"eventName":"mock",
"vin":"MOCK1TELWMOMZRQAWO",
"hardwareVersion":"3.03.01.000",
"carseries":"E115",
"pdsn":"47556519116431",
"displayId":"0"
},
"gather_data":{
"key1":"value5",
"key2":"69",
"key3":"0"
}
}
- 告警规则
common_data.appPackage == 'ltd.qisi.sotasupportapp' 表示common_data.appPackage 字段等于’td.qisi.sotasupportapp
7、输出业务告警数据格式设计
1、车机端告警统计格式
CREATE TABLE `flink`.`flink-alert-data` (
`app_package` varchar(255) comment 'app包名'
,`collected_time` bigint(16) comment '数据'
,`behavior_id` varchar(255) comment ''
,`qisiui_version` varchar(255) comment ''
,`uid` varchar(255) comment '用户id'
,`region_code` varchar(255) comment ''
,`os_version` varchar(255) comment ''
,`event_name` varchar(255) comment ''
,`vin` varchar(255) comment 'vin码'
,`hardware_version` varchar(255) comment ''
,`carseries` varchar(255) comment ''
,`pdsn` varchar(255) comment ''
,`display_id` varchar(255) comment '屏幕id[ 0:主控屏;1:副驾屏;2:左后排屏;3:右后排屏;-1:未知]'
,`rule_name` varchar(255) comment ''
,`rule_id` varchar(255) comment ''
,`rule_exp` varchar(255) comment ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2、云端告警统计格式
CREATE TABLE `flink`.`flink-cloud-alert-data` (
`microservice` varchar(255) comment ''
,`reqPath` varchar(255) comment ''
,`clicnetIP` varchar(255) comment ''
,`resultCode` varchar(255) comment ''
,`createDate` varchar(255) comment ''
,`ctime` varchar(255) comment ''
,`rule_name` varchar(255) comment ''
,`rule_id` varchar(255) comment ''
,`rule_exp` varchar(255) comment ''
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
8、AlertManager告警模块设计
对接AlertManager的模块为ISendService 的实现类,通过调用void send(AlertManagerData data) 方法把数据发送的出去,其其实类会调用AlertManager的post请求发送json数据,请求的基本格式为:
curl -XPOST http://localhost:9093/api/v1/alerts -d '
[
{
"labels": {
"alertname": "DiskRunningFull",
"dev": "sda1",
"instance": "中文测试",
"route": "WEBHOOK"
},
"annotations": {
"info": "The disk sda1 is running full",
"summary": "please check the instance example1"
},
"Source": {
"link": "http://www.baidu.com"
}
}
]
'
三、提交 yarn 命令
flink任务的执行流程为:
1、mvn打包;
$ mvn clean package
2、把打包好的jar包上传到hdfs,路径为 /flink/jobs/;
$ hadoop fs -rm /flink/jobs/flink-AlertManager-v1.1.jar
$ hadoop fs -put ./flink-AlertManager-v1.1.jar /flink/jobs/
3、提交yarn-applicationn
$ flink run-application -t yarn-applicationn -Dyarn.application.name="Flink Alert System" hdfs:///flink/jobs/flink-AlertManager-v0.1.jar MysqlAndAlertManagerSink --app.type CLIENT_EVENT
必填参数:
- 执行的class类 , MysqlAndAlertManagerSink
- 应用类型 --app.type AppType的枚举值,忽略大小写
执行任务可选的参数:
1、Mysql 可选参数
--mysql.conf.file alert/mysql.properties
--mysql.host sit.dbaas.private
--mysql.port 13500
--mysql.db flink
--mysql.table flink.flink-alert-rule
--mysql.username tsp
--mysql.passwd TspMysql2020!
2、AlertManager 可选参数
--am.route <WEBHOOK、MAIL>
--am.host 10.6.215.39
--am.port 9093
3、Kafka 可选参数
--kafka.sink.conf.file alert/producer.properties
--kafka.source.conf.file alert/kafka.properties
--kafka.sink.topic sentry-sink-topic-test
源代码:
https://gitee.com/zhangjian-eng/flink-real-time-rule-alert
|