IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ消息的可靠性投递 -> 正文阅读

[大数据]RabbitMQ消息的可靠性投递

在使用RabbitMQ的过程,如何保证消息100%不丢失,可靠的抵达队列,然后又100%可靠的被消费端消费呢??保证消息的可靠投递在实际生产中非常重要

今天就来说一下RabbitMQ的消息可靠性投递,先看一下图

从图上可以看到从发送消息到消息最后的消费的链路是:生产者把消息发送到Brocker(Brocker把消息给对应的交换机),交换机把消息投递给对应的队列,消费者监听对应的队列进行消费

所以消息的可靠投递分为了两大内容:发送端的确认(也就是p到b和e到q)和消费端的确认(也就是q到c)

一、发送端确认

发送端的确认又分为了消息到达Brocker的确认和消息到达队列的确认

1、消息到达Brocker的确认

当消息安全抵达Brocker后,就会自动的触发一个回调,这个回调就是confirmCallBack?

想要触发confirmCallBack回调,需要对项目进行配置:

1.1修改yml配置文件

# 开启发送端消息抵达Broker确认
spring:
    rabbitmq:
        publisher-confirm-type: correlated

1.2对RabbitTemplate进行配置

/**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(可以认为这是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
            if(ack){
                System.out.println("消息发送到Brocker成功");
            }else {
                System.out.println("消息发送到Brocker失败,记录日志或写入数据库...");
            }
        });

?测试发送一条消息

@Test
    public void testSendMsg(){
        rabbitTemplate.convertAndSend("hello-java-exchange","hello.java","this is msg");
    }

看下结果,可以看到结果已经打印出来了。

2、消息到达队列的确认

消息没有正确抵达队列的话就会触发另外一个回调,就是returnCallBack

2.1修改yml配置文件

spring:
    rabbitmq:
        #开启发送端消息抵达Queue确认
        publisher-returns: true
        #只要消息未抵达Queue,就会异步发送优先回调returnConfirm
        template:
            mandatory: true

2.2对RabbitTemplate进行配置

/**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         */
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("消息投递失败:["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });

测试发送一条消息,故意把路由键写错,让消息投递到队列失败

@Test
    public void testSendMsg(){
        rabbitTemplate.convertAndSend("hello-java-exchange","abc123","this is msgqqqqqqqq");
    }

看下结果,可以看到结果已经打印出来了。

可以看到输出的结果

消息投递失败:[(Body:'"this is msgqqqqqqqq"' MessageProperties 
[headers={__TypeId__=java.lang.String}, contentType=application/json, 
contentEncoding=UTF-8, contentLength=0, 
receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
==>replyCode[312]==>replyText[NO_ROUTE]==>exchange[hello-java-exchange]==>routingKey[abc123]

二、消费端确认

默认情况下,消费端是自动ack的,我们需要把它改成手动ack确认

1、添加配置

#开启手动确认消息
rabbitmq:
  listener:
    simple:
      acknowledge-mode: manual

2、代码中使用

    @RabbitHandler
    public void doSomething(XxxxtMessage xxxxtMessage , Message message, Channel channel) throws IOException {
        log.info("监听到消息:doSomething消息内容:{}", xxxxtMessage );
        long msgTag = message.getMessageProperties().getDeliveryTag();
        boolean flag = xxxService.xxx(xxxxtMessage );
        try {
            if (flag) {
                //手动确认消息消费成功
                channel.basicAck(msgTag, false);
            }else {
                //消费失败后,重新投递入队
                channel.basicReject(msgTag,true);
                log.error("xxx失败 flag=false,{}",xxxxtMessage );
            }
        } catch (IOException e) {
            log.error("xxx异常:{},msg:{}",e,xxxxtMessage );
            //异常,重新投递入队
            channel.basicReject(msgTag,true);
        }
    }

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:18:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:48:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码