项目结构采用了star方法,Situation:概括性的总结业务背景和挑战;Task:介绍你负责的任务已经需要达成的目标;Action:项目中你采取的关键行动;Result:项目落地后的实际效果。本文是项目实战第六讲:协议变更联动商品变更
1、项目背景
项目背景:商品销量、成交记录是采购人在采购商品过程,非常重要的参考信息,而目前商家重新签订协议后,需要商家重新签订协议,并基于新的协议重新发布新的协议商品,导致了销量和成交记录数据丢失。
面临的挑战:** ①数据量大 协议与商品是一对多的关系,按线上数据,一份协议最多可能对应30万商品;
②影响面广 涉及四个上下游团队;
2、主要技术
要实现的功能: (1)协议状态变更联动商品变更; (2)协议迁移联动商品迁移; (3)使用批量任务避免数据大量变更对系统稳定性的影响; (4)使用策略模式提升系统可拓展性。
使用的技术: SSM+Maven+Springboot+Git+Dubbo+RocketMQ+Redis+Apollo+ ElasticSearch
- ①主要用maven构建项目模块和管理项目,控制jar包版本;
- ②用Dubbo+zookeeper搭建分布式SOA架构;
- ③查询商品详情接口ES,提升系统查询性能;
- ④使用批量任务(Redis+MQ)避免大量数据变更对系统稳定性的影响,用RocketMQ解耦任务发送和任务处理逻辑;
- ⑤用Redis实现分布式锁+记录任务数量;
- ⑥使用RateLimiter + Apollo动态调整限流数量;
- ⑦使用策略模式提升系统可扩展性
3、项目职责 **
- ①使用批量任务(Redis + MQ)避免数据异步处理该变更任务,能支持普通/顺序/延迟任务;
- ②使用策略模式 + 模板模式提升系统可扩展性;
- ③查询商品详情接入ES,提升系统查询性能;
- ④使用RateLimiter + Apollo动态调整限流数量;
- ⑤用Redis实现分布式锁+记录任务数量
4、项目实现
总体的流程图如图1所示:
总体调度图如图2所示:
亮点1:使用消息队列制作任务处理器
- 使用MQ原因:业务关联的数据量大,需要做异步处理
- 业务逻辑:表结构 表1 主任务执行表
字段 | 类型 | 含义 | 备注 |
---|
id | bigint | 主键id | | biz_id | varchar | 业务id | | param | varchar | 子任务执行参数 | | job_type | varchar | 消息类别 | 普通消息、顺序消息、延迟消息 | mq_tag | varchar | 消息队列tag | | total_jobs | int | 子任务总数 | | sub_job_size | int | 子任务每次执行数量 | | status | int | 任务执行状态 0-收集任务中 1-排队中 2-进行中 3-已完成 4-待执行 | | failed_jobs | int | 子任务失败数 | | success_jobs | int | 子任务成功总数 | | expect_execute_time | datetime | 期待执行时间 | | sub_job_ids_json | mediumtext | 子任务ID集合json | |
表2 子任务执行表
字段 | 类型 | 含义 | 备注 |
---|
id | bigint | 主键 | | batch_id | bigint | 批次id | 表一的外键 | sub_job_id | bigint | 子任务id | | error_msg | long | 失败原因 | | status | int | 执行状态 0-未执行 1-执行成功 2-执行失败 | |
- 使用了redis来统计任务数量
- 作用:查询执行进度时,先查redis,可以提升系统性能
- redis写
- ①子任务是否推送结束
- ②执行成功数
- ③执行失败数
- ④子任务执行总数 推送结束后重置子任务执行总数
- ⑤子任务推送数, 防止数据统计不准
- redis读
Action1:限流逻辑是怎么做的?
- 使用了Google guava 的 rateLimiter + Apollo配置
- 限流时机:在消息接收方的第一行代码中执行
rateLimiter.acquire(); ,并将限流策略配置在Apollo中,可以按线上情况动态调整; - 限流情况:QPS按压测情况来确定,目前线上为单机1,也就是每秒执行商品变更数为 1 * 8(机器数) * 20(子任务单批执行数);
- 限流原理: 令牌桶,可以参考这篇文章 todo
Action2:如何精确地统计数据?
- 消息会重复发送,由于扩容和缩容、应用重启会导致MQ重试,统计的消息数量不一定准确。因此,如果当子任务全部执行完成后,会重新取db数据,判断是否和redis中数据一致,不一致,以db数据为准
- 子任务防重复提交
Action3:如何提升任务处理器的拓展性?
- 在子任务执行过程中,商品受上游业务的变化,有多种状态的变更,使用了策略模式来定义各个状态,在各个策略中执行逻辑,方便扩展。
- 定义BizChangeHandler 业务变更处理器
- 实现类:①联动商品上架处理器(无需完整校验流程)
- ②联动商品上架处理器(需完整校验流程)
- ③联动商品下架处理器
- ④联动商品冻结处理器
- 在MQ任务处理逻辑中,使用了模板模式,目前提供了这三个拓展点供业务方实现
- ①限流逻辑
- ②子任务处理消费逻辑
- ③联动商品下架处理器
Action4:如何使用任务处理器实现普通消息、有序消息和定时消息?
-
普通消息 or 有序消息
- 创建主任务或子任务推送时,增加一个标识,告知程序这个消息是有序的。在通知子任务执行时,如果判断该字段为顺序消息,使用
sendOrderMsg(String topic, String tag, String key, String msgBody, String shardingKey) ,否则:sendCommonMsg(String topic, String tag, String key, String msgContent) -
定时消息:告知消息是定时的
- 结合使用了ElasticJob
- 主表中有个字段
expect_execute_time 期待执行时间 sub_job_ids_json 子任务ID集合json,可以得知当前时间点需要执行的子任务,然后发送普通消息处理子任务。
亮点2:大数据量的处理,一份协议最多挂30万个商品,变更协议,如何做到系统稳定呢?
- 使用了批量任务(Redis+MQ)避免大量数据变更对系统稳定性的影响,用RocketMQ解耦任务发送和任务处理逻辑;
Action1:使用ES来快速查找商品
- 由于商品数量达到了2亿,如果统计任务时,查询ES,而不是直接查询db,性能会好很多;
- 使用ES能快速查找到需要执行变更操作的商品。并加了重试机制,防止高峰期,查询ES超时的问题。
Action2:在这个项目中的收获?
- ①创新点:使用MQ + redis 实现了任务处理器,支持普通/顺序/延迟消息;
- ②拓展性:使用了模板模式 + 策略模式提升系统稳定性;
- ③大数据量:对大量商品变更接入ES,提升运行效率
Action3:可以优化的地方?
- ①优化任务处理表。目前统计的维度为子任务,数据量会膨胀,后续通过调整任务粒度来减少数据量
5、项目结果
1:QPS:目前是单机QPS 1,共部署了8台机器,子任务每批执行的商品数量为20,因此,整体是每秒变更160条商品。 2:使用量:需要统计下日志或db todo
学而不思则罔,思而不学则殆 – 《论语》
|