IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 37 生产案例:基于RocketMQ进行订单库数据同步的消息乱序问题及解决方案 -> 正文阅读

[大数据]37 生产案例:基于RocketMQ进行订单库数据同步的消息乱序问题及解决方案

一、消息乱序问题

1. 消息乱序的场景

前面说过了消息丢失、消息重复、消息处理失败三个问题,接下来是关于消息乱序的问题及解决方案。

场景:消息乱序的发生场景

大数据团队在获取订单数据库中的全部数据后,需要将订单数据保存一份在自己的大数据存储系统中,比如HDFS、Hive、HBase等。

以上的过程在优惠后会基于Canal中间件去监听订单数据库的binlog,也就是通过一些增删改操作的日志,然后把这些binlog发送到MQ里去。

然后大数据系统从MQ里获取binlog,落地到自己的大数据存储中去;再由大数据系统对存储的数据进行计算得到数据报表即可。

?2. binlog消息乱序导致的数据指标错误

数据指标错误

基于Canal中间件同步后的数据生成的报表在与订单数据库中的订单数据作对比的时候,是有可能发生数据对不上的错误问题的。

比如:某个订单的库存量在订单库存数据库里是100,但是在报表上却是0。

binlog消息乱序问题

基于Canal同步中间件的使用过程中,订单数据库的binlog在通过MQ同步的过程中,是会发生消息乱序的现象。

简单来说,在订单系统更新数据库时有两条SQL语句:

insert into order values(xx,0)

update order set xxvalue=100 where id=xxx

订单系统的流程是先 insert插入一条数据,然后再update修改 id=xx对应的那条数据的xxvalue为100。

这两条语句写入binlog时是先载入insert语句,然后再写入update语句。

对应的在大数据系统从MQ获取出来binlog的时候,确实先获取到了 update 语句的binlog,然后才获取到 insert 语句的 binlog 。

也就是说,此时会先执行更新操作,但数据不存在会更新失败;接着执行插入操作。也就是插入一条字段值为0的订单数据进去,最终大数据系统存储的对应id的value就是0了。

也正是因为这个消息乱序的原因,导致了大数据存储中的数据都错乱了。

3. 为什么基于MQ来传输数据会出现消息乱序

说明:发生消息乱序不是绝对的,这涉及到MQ底层的原理。

原理解析:

在MQ系统中,可以给每个Topic指定多个MessageQueue,然后生产者写入消息的时候,就会把消息均匀分发给不同的MessageQueue的。

当写入binlog到MQ的时候,可能会把insert binlog写入到一个MessageQueue里去,update binlog写入到另一个 MessageQueue里去。

?而当大数据系统去获取binlog的时候,在部署多态机器组成一个Consumer Group,对于Consumer Group中的每台机器都会负责消费一部分MessageQueue的消息,所以可能一台机器从ConsumerQueue01中获取到 insert binlog,另一台从ConsumerQueue02中获取到了update binlog。

而导致消息乱序执行的关键就在于,两台机器上的大数据系统并行的去获取 binlog ,就有可能一个大数据系统先获取到了 update binlog 去执行了更新操作,此时存储中没有数据,没法更新。

然后另一个大数据系统再获取到 insert binlog 去执行插入操作,最终导致只有一个字段值为0的订单数据。

?4. 消息乱序总结

消息乱序的问题发生:原本有顺序的消息,因为存在分发到不同的MessageQueue中去,然后不同机器上部署的Consumer可能会用混乱的顺序从不同的MessageQueue里获取消息然后处理。

二、RocketMQ中解决消息乱序问题的方案

1.第一步:让属于同一个订单的binlog进入一个MessageQueue

想要解决消息的乱序问题,最根本的方法就是让一个订单的binlog进入到同一个 MessageQueue里去。

场景分析:

以前面的例子来说,一个订单,先后执行了 insert、update两条SQL语句,也就对应了2个binlog。我们根据订单id来进行判断,在往MQ里发送 binlog 的时候,根据订单 id 来判断一下,如果 id 相同,就必须保证它进入同一个 MessageQueue。

具体的可以采用取模的方法,比如有一个订单id是1100,那么它的两个binlog,我们用订单 id=1100 对MessageQueue的数量进行取模。如:MessageQueue有15个,订单 id=1100对15取模,结果为5。

那么就将订单id=1100的binlog,都进入位置为5的MessageQueue里去。

通过这个方法,就可以保证让一个订单的 binlog 都按照顺序进入到一个 MessageQueue中去。

?2.第二步:有序的去获取binlog

在保证一个订单的binlog都进入一个MessageQueue之后,还不够。

要知道,MySQL数据库的 binlog 一定是有顺序的。当我们从MySQL数据库中获取 binlog 的时候,也必须按照 binlog 的顺序来获取。

也就是说,当Canal从MySQL那里监听和获取 binlog ,那么当 binlog 传输到 Canal 的时候,也是有先后顺序的,先是 insert binlog ,然后是 update binlog。

?接着将 binlog 发送给 MQ 的时候,必须将一个订单的 binlog 都发送到一个 MessageQueue里去,而且发送过去的时候,也必须是按照顺序来发送的。

只有这样,最终才能让一个订单的 binlog 进入同一个 MessageQueue 的时候是有序的。

3. 第三步:Consumer有序处理一个订单的binlog

在MQ底层,一个Consumer可以处理多个 MessageQueue的消息,但是一个MessageQueue只能交给一个Consumer来进行处理,所以一个订单的 binlog 只会有序的交给一个 Consumer? 来进行处理。

在完成以上三步的方案后,大数据系统就可以获取到一个订单的有序的 binlog ,然后有序的根据 binlog 把数据还原到自己的存储中去。

4.消息处理失败了不能走重试队列

由于Consumer处理消息的时候,可能会因为底层存储挂了导致消息处理失败,之前的逻辑是可以返回RECONSUME_LATER状态。然后 broker 会过一会儿自动给我们重试。

但是现在要解决消息乱序问题,就不能这么做了。在有序消息的方案中,如果遇到消息处理失败的场景,就必须返回SUSPEND_CURRENT_QUEUE_A_MOMENT状态,它的意思是先等一会儿,一会儿再继续处理这批消息,而不能把这批消息放入重试队列去,然后直接处理下一批消息。

5.扩展

(1)如果处理消息一直异常,不会让消费者一直等待下去的,而是重试一定次数后会进入死信队列。

三、RocketMQ顺序消息机制的代码实现案例

1.让一个订单的 binlog 进入一个MessageQueue

首先要实现消息顺序,必须让一个订单的 binlog 都进入一个MessageQueue 中,代码如下:

?在上面的代码片段中,有两个关键点,一个是发送消息的时候传入一个MessageQueueSelector,在里面要根据订单id 和MessageQueue数量去选择这个订单id的数据进入哪个 MessageQueue。

同时在发送消息的时候除了带上消息自己以为,还要带上订单id,然后MessageQueueSelector会根据订单id去选择一个MessageQueue发送过去,这样的话,就可以保证一个订单的多个binlog都会进入一个MessageQueue中去。

2.消费者保证按照顺序来获取一个MessageQueue中的消息的代码实现

消费者按照顺序来获取一个MessageQueue中的消息的逻辑如下:

?在上述的代码中,使用的是 MessageListenerOrderly 这个东西,它里面有 Orderly 这个名称。也就是说,Consumer会对每一个 ConsumerQueue,都仅仅用一个线程来处理其中的消息。

比如对 ConsumeQueue01 中的订单id=1100的多个 binlog ,会交给一个线程来按照 binlog 顺序来一次处理。否则如果 ConsumeQueue01中的订单id=1100 的多个 binlog 叫个 ConsumeQueue 中的多个线程来处理的话,那还是会有消息乱序的问题。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:09:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年3日历 -2024/3/29 23:01:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码