IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ如何确保消息不丢失 基于SpringBoot项目解决方式 -> 正文阅读

[Java知识库]RabbitMQ如何确保消息不丢失 基于SpringBoot项目解决方式

这个问题在消息队列中是一个很常见的问题,而消息的丢失又分为在哪几个阶段丢失:

  • 发送者发送消息到broker时丢失消息;
  • 交换机投递消息到队列时丢失消息;
  • 消费者端丢失消息。

准备工作

以下基于springboot项目演示RabbitMq。

创建一个交换机、一个队列,并将它们绑定。

@Configuration
@Slf4j
public class RabbitMqConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 队列:
     */
    @Bean
    public Queue queue01() {
        return new Queue("queue01", true, false, false);
    }

    /**
     * 交换机
     */
    @Bean
    public Exchange exchange01() {
        return new DirectExchange("exchange01", true, false);
    }

    /**
     * 绑定
     */
    @Bean
    public Binding binding() {
        return new Binding("queue01", Binding.DestinationType.QUEUE, "exchange01", "send.msg", null);
    }

}

发送端消息丢失

情况1:消息抵达broker之前丢失

在这里插入图片描述

解决:可以通过开启RabbitMQ的confirm机制来避免这种情况,当消息达到broker的时候,就回去触发ConfirmCallback中的confirm函数。

步骤:

  1. 在配置文件中添加配置:
# 开启发送者confirm机制
spring.rabbitmq.publisher-confirm-type=correlated

注意,在版本较低的RabbitMq中,spring.rabbitmq.publisher-confirm-type=correlated ====> spring.rabbitmq.publisher-confirms=true,高版本这种方式会提示已过时。

  1. 在配置类中实现ConfirmCallback对象,重写其confirm方法,并且将他注入到RabbitTemplate对象中
@Autowired
private RabbitTemplate rabbitTemplate;
 
@PostConstruct
public void initRabbitTemplate() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData.getId();
            if (!ack) {
                log.info("id为: {}的消息,未能抵达broker, 原因: {}", id, cause);
            } else {
                log.info("broker已接收id为: {}的消息", id);
            }
        }
    });
}
  • CorrelationData:封装了当前消息的一些信息,包括当前消息的id;
  • ack:消息是否成功被投递到broker中;
  • cause:投递失败的原因。
  1. 测试,向队列中发送一条消息
@GetMapping("/send")
public String sendMsg(@RequestParam("msg") String msg) {
    rabbitTemplate.convertAndSend("exchange01", "send.msg", msg, new CorrelationData(UUID.randomUUID().toString()));
    return "success";
}

这里发送消息携带了CorrelationData对象,如果未携带,则confirm方法是接收不到这个参数的。

在这里插入图片描述

confirm机制是异步的,在等待信道返回确认的同时,你还可以继续发送下一条消息。

情况2:消息抵达队列之前丢失

还有一种情况,也就是broker接收到消息了,但是在交换机向队列投递消息的过程中,由于某种原因,消息未能抵达队列,造成的消息丢失。
在这里插入图片描述

解决:创建消息抵达队列失败的回调函数

步骤:

  1. 配置文件中添加配置:
# 开启发送者消息抵达队列的确认机制
spring.rabbitmq.publisher-returns=true
# 只要消息未能抵达队列 就触发失败回调
spring.rabbitmq.template.mandatory=true
  1. 在配置类中实现returnedMessage中的returnedMessage方法,并且注入到RabbitTemplate对象中。
@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void initRabbitTemplate() {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData.getId();
            if (!ack) {
                log.info("id为: {}的消息,未能抵达broker, 原因: {}", id, cause);
            } else {
                log.info("broker已接收id为: {}的消息", id);
            }
        }
    });
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            // 该消息抵达的交换机
            String exchange = returned.getExchange();
            // 消息体的详细信息
            Message message = returned.getMessage();
            // 回复的状态码
            int replyCode = returned.getReplyCode();
            // 该消息使用的路由键
            String routingKey = returned.getRoutingKey();
            // 回复的文本内容
            String replyText = returned.getReplyText();
            log.info(
                    "exchange: {}, message: {}, replyCode: {}, routingKey: {}, replyText: {}",
                    exchange, message, replyCode, routingKey, replyText
                    );
        }
    });
}

注意,该方法只有消息抵达队列失败时才会调用。

接收端消息丢失

这种情况是在接收者接收到队列中的消息之后,因为某种情况导致该消息并没有被处理到,但是在队列中已经将该消息删除了,导致了消息丢失。

解决:开启手动ack模式,确保我们在处理完该消息后,手动ack告诉队列,这条消息处理完了,你可以删除了,否则如果出现故障导致该消息并没有被处理,那么还会保留在队列中,不会被丢失。

步骤:

  1. 配置文件添加配置:
# 开启手动ack模式 默认:auto
spring.rabbitmq.listener.direct.acknowledge-mode=manual
  1. 在接收到消息时,处理完该消息后,进行手动ack
@Component
public class MyRabbitListener {

    @RabbitListener(queues = "queue01")
    @RabbitHandler
    public void receiveMsg(String msg, Channel channel, Message message) throws IOException {
        System.out.println("received msg:" + msg);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("处理消息:" + msg);
            // 处理完毕 手动ack
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 出现异常 拒绝
            channel.basicNack(deliveryTag, false, true);
            //channel.basicReject(deliveryTag, true);
        }
    }

}
  • void basicAck(long deliveryTag , boolean multiple) throws IOException
    • deliveryTag:一个自增的消息唯一标识,当消息被拒绝重写投递到队列后,该消息对应的deliveryTag就会+1;
    • multiple:是否批量确认,将比当前消息deliveryTag值小的所有消息都进行确认。比如,我们当前存在deliveryTag值分别为:1、2、3的消息,当前消息的deliveryTag为4,如果开启当前消息的批量确认,那么会将deliveryTag为1,2,3,4这些消息都批量确认。
  • void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    • deliveryTag:同上;
    • multiple:是否批量;
    • requeue:是否重写入队。
  • void basicReject(long deliveryTag, boolean requeue)

basicNack和basicReject的区别:前者支持批量,将之前为拒绝的消息都批量拒绝;后者只拒绝当前消息。

测试一下,在处理消息之前手动添加一个异常,发送一条消息:
在这里插入图片描述

该消息会一直被重新投递到队列中。

在这里插入图片描述

除了以上的这些方式,也是可以使用RabbitMQ事务机制来解决消息丢失的问题,但是这种方式不推荐使用。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-09-24 20:42:45  更:2022-09-24 20:47:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 9:16:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码