IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RocketMQ的重复消费、死信队列、乱序问题 -> 正文阅读

[大数据]RocketMQ的重复消费、死信队列、乱序问题

为什么会重复消费

在这里插入图片描述
比如生产者发送消息的时候使用了重试机制,发送消息后由于网络原因没有收到MQ的响应信息,报了个超时异常,然后又去重新发送了一次消息。但其实MQ已经接到了消息,并返回了响应,只是因为网络原因超时了。
这种情况下,一条消息就会被发送两次。

在这里插入图片描述
在消费者处理了一条消息后会返回一个offset给MQ,证明这条消息被处理过了。
但是,假如这条消息已经处理过了,在返回offset给MQ的时候服务宕机了,MQ就没有接收到这条offset,那么服务重启后会再次消费这条消息。

如何解决重复消费

解决重复消费的关键就是引入幂等性机制,什么是幂等性机制呢?我们可以把它理解成,假如一个接口被重复调用,依然可以保证数据的准确性。
对于生产者重复发送消息到MQ这一过程,其实我们没有必要去保证幂等性,只要在消费者处理消息时保证幂等性就可以了。
这块其实就比较简单了,只要处理消息之前先根据业务判断一下本次操作是否已经执行过了,如果已经执行过了,那就不再执行了,这样就可以保证消费者的幂等性。
举个例子,比如每条消息都会有一条唯一的消息ID,消费者接收到消息会存储消息日志,如果日志中存在相同ID的消息,就证明这条消息已经被处理过了。

消息重试、延时消息、死信队列

解决完重复消费问题,我们来思考一种极端情况,比如某一时刻,消费者操作的数据库宕机了,这个时候消费者会发生异常,当然不能返回给MQ一个CONSUME_SUCCESS了,我们可以返回RECONSUME_LATER,他的意思是我现在没法处理这些消息,一会再来试试能不能处理。

简单来说,RocketMQ会有一个针对当前Consumer Group的重试队列,如果你返回了RECONSUME_LATER,MQ会把你的这批消费放到当前消费组的重试队列中,然后过一段时间重试队列中的消息会再次发送给消费者,默认可以重试16次,每次重试的间隔是不同的,这个时间间隔是可以配置的,默认配置如下:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

细心的小伙伴会发现,这个配置一共有18个时间,为什么最多重试16次,配置中却有18个时间呢,这里就要说到延时消息了。

上边的配置其实不是针对重试队列的,而是针对延时消息的,18个时间分别代表延迟level1-level18,延时消息大概流程如下:

1 所有的延迟消息到达broker后,会存放到SCHEDULE_TOPIC_XXX的topic下(这个topic比较特殊,对客户端是不可见的,包括使用rocketmq-console,也查不到这个topic)

2 SCHEDULE_TOPIC_XXX这个topic下存在18个队列,每个队列中存放的消息都是同一个延迟级别消息

3 broker端启动了一个timer和timerTask的任务,定时从此topic下拉取数据,如果延迟时间到了,就会把此消息发送到指定的topic下,完成延迟消息的发送

刚才我们说如果你返回了RECONSUME_LATER,消息就会进入重试队列,其实不完全准确。

当MQ接收到RECONSUME_LATER后,首先会完成消息的转换,把消息存到延时队列中,然后再根据消息的延时时间保存到重试队列中。

如果重试了16次之后依然无法处理,就会把这些消费放入死信队列。死信队列中的消息RocketMQ不会再做处理,这部分数据要怎么处理就要看我们的业务场景了,我们可以做一个后台线程去订阅这个死信队列,完成后续消息的处理。

在这里插入图片描述

消息乱序

每个Topic可以有多个MessageQueue,写入消息的时候实际上会平均分配给不同的MessageQueue。
然后假如我们有一个Consume Group,这个消费组中的每台机器都会负责一部分MessageQueue,那么就会导致消息的顺序乱序问题。
举个例子,生产者发送了两条顺序消息,先是insert,后是update,分别分配到两个MessageQueue中,消费者组中的两台机器分别处理两个队列的消息,这个时候是无法保证顺序性的,有可能会先执行update,后执行insert,导致数据发生错误。

那么如何解决消息乱序问题呢?

1.保证生产者 - MQServer - 消费者是一对一对一的关系(性能差)

2.一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 22:37:16  更:2022-03-15 22:38:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:57:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码