IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息 -> 正文阅读

[大数据]9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息

大家好,我是君哥。

引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们来聊一聊 RocketMQ 怎么做能确保消息不丢失。

1 RocketMQ 简介

RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同时 Name Server 进行集群注册管理和保存元数据。

2 消息不丢失

要想保证消息不丢失,需要从以下几个方面考虑:

  • Producer 发送消息

  • Broker 保存消息

  • Consumer 消费消息

  • Broker 主从切换

维度 1:同步发送,代码如下:

public?void?send()?throws?Exception?{
????String?message?=?"test?producer";
????Message?sendMessage?=?new?Message("topic1",?"tag1",?message.getBytes());
????sendMessage.putUserProperty("name1","value1");
????SendResult?sendResult?=?null;

????DefaultMQProducer?producer?=?new?DefaultMQProducer("testGroup");
????producer.setNamesrvAddr("localhost:9876");
????producer.setRetryTimesWhenSendFailed(3);
????try?{
????????sendResult?=?producer.send(sendMessage);
????}?catch?(Exception?e)?{
????????e.printStackTrace();
????}
????if?(sendResult?!=?null)?{
????????System.out.println(sendResult.getSendStatus());
????}
}

同步发送会返回 4 个状态码:

  • SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功

  • FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。

  • FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。

  • SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。

根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。

消息重试时,消费端一定要做好幂等处理。

维度 2:异步发送,代码如下:

public?void?sendAsync()?throws?Exception?{
????String?message?=?"test?producer";
????Message?sendMessage?=?new?Message("topic1",?"tag1",?message.getBytes());
????sendMessage.putUserProperty("name1","value1");

????DefaultMQProducer?producer?=?new?DefaultMQProducer("testGroup");
????producer.setNamesrvAddr("localhost:9876");
????producer.setRetryTimesWhenSendFailed(3);
????producer.send(sendMessage,?new?SendCallback()?{
????????@Override
????????public?void?onSuccess(SendResult?sendResult)?{
????????????
????????}

????????@Override
????????public?void?onException(Throwable?e)?{
????????????// TODO 可以在这里加入重试逻辑
????????}
????});
}

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

维度 3:刷盘策略

  • 异步刷盘:默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

  • 同步刷盘:消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:

flushDiskType=SYNC_FLUSH

维度 4:Broker 多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。如下图:

?

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

  1. slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset;

  2. master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave;

  3. slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset;

  4. master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

维度 5:消息确认

Consumer 消费消息的代码如下:

public?void?consume()?throws?Exception?{
????DefaultMQPushConsumer?consumer?=?new?DefaultMQPushConsumer("testGroup");
????consumer.setNamesrvAddr("localhost:9876");
????consumer.setMessageModel(MessageModel.CLUSTERING);
????consumer.subscribe("topic1",?"tag1");
????consumer.registerMessageListener((MessageListenerConcurrently)?(msgs,?context)?->?{
????????try{
????????????System.out.printf("Receive?New?Messages:?%s",?msgs);
????????????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
????????}catch?(Exception?e){
????????????e.printStackTrace();
????????????return?ConsumeConcurrentlyStatus.RECONSUME_LATER;
????????}
????});
????consumer.start();
}

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

维度 6:Consumer 重试

Consumer ?消费失败,这里有 3 种情况:

  • 返回 RECONSUME_LATER

  • 返回 null

  • 抛出异常

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。

注意:

  • Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。

  • 重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。

  • Consumer 端一定要做好幂等处理。

其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地,给 Broker 返回CONSUME_SUCCESS 来结束重试。代码如下:

int?count?=?((MessageExt)?msgs).getReconsumeTimes();
if?(count?>?2)?{
????//TODO?把消息写入本地存储
????System.out.println("重试次数超过3次");
????return?ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

维度7:事务消息

RocketMQ支持事务消息,整体流程如下图:

  1. Producer 发送 half 消息;

  2. Broker 先把消息写入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的队列,之后给 Producer 返回成功;

  3. Producer 执行本地事务,成功后给 Broker 发送 commit 命令(本地事务执行失败则发送 rollback);

  4. Broker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic;

  5. Consumer 拉取消息进行消费。

代码如下:

public?class?ProducerTransactionListenerImpl?implements?TransactionListener?{

????@Override
????public?LocalTransactionState?executeLocalTransaction(Message?msg,?Object?arg)?{
????????/**
?????????*?这里执行本地事务,执行成功返回LocalTransactionState.COMMIT_MESSAGE,执行失败返回
?????????*?LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
?????????*?Broker会回来查询,所以需要记录事务执行状态
?????????*/
????????return?LocalTransactionState.COMMIT_MESSAGE;
????}

????@Override
????public?LocalTransactionState?checkLocalTransaction(MessageExt?msg)?{
????????/**
?????????*?这里查询事务执行状态,根据事务状态返回LocalTransactionState.COMMIT_MESSAGE或
?????????*?LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询到返回LocalTransactionState.UNKNOW,
?????????*?Broker会再次查询,可以记录查询次数,超过次数后返回ROLLBACK_MESSAGE
?????????*/
????????return?LocalTransactionState.UNKNOW;
????}
}

维度 8:消息索引

我们知道,RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一个索引文件,结构如下图:

?

查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。

在 Producer 发送消息时,可以指定一个 key,代码如下:

Message?sendMessage?=?new?Message("topic1",?"tag1",?message.getBytes());
sendMessage.setKeys("weiyiid");

这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。

维度 9:极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

3 总结

在一些特殊的业务场景,比如支付、银行核算等,需要确保消息不丢失,但是同时也要看到,消息不丢失的方案会大大降低 RocketMQ 的吞吐量,需要综合考虑。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-09 20:46:17  更:2022-02-09 20:47:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 21:34:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码