IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【RabbitMq05】发布确认高级 -> 正文阅读

[大数据]【RabbitMq05】发布确认高级

一、发布确认案例

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,
导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢

在这里插入图片描述

举个栗子

第一步、配置类

自动在rabbitmq控制台生成 交换机、队列

package com.lian.rabbitmq.c;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置类,发布确认
 */
@Configuration
public class ConfirmConfig {

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    //交换机通过routingKey 绑定队列
    @Bean
    public Binding queueBindingExchange(
            @Qualifier("confirmExchange") DirectExchange confirmExchange,
            @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

}

第二步、生产者

package com.example.demo.c;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //发送消息
        rabbitTemplate.convertAndSend(
                ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,
                message);
        log.info("发送消息内容",message);
        System.out.println("发送消息内容"+ message);
    }
}

第三步、消费者

package com.example.demo.c;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message){
        String msg = new String(message.getBody());
        log.info("接收到的队列消息",msg);
        System.out.println("接收到的队列消息"+ msg);
    }

}

正常是没有问题的,但是如果出现 交换机或队列出现问题,怎么办呢?
如果出现问题,消费者没有收到生产者发送的消息,就会触发回调结果

二、回调接口

package com.example.demo.c;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 实现 rabbit回调接口
 *
 * @PostConstruct该注解被用来修饰一个非静态的void()方法
 * 执行顺序
 * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //将本类 MyCallBack 注入到 RabbitTemplate.ConfirmCallback 接口里面
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     * 1、发消息 交换机接收到了, 回调
     *     1.1、correlationData 保存回调消息的ID及相关信息
     *     1.2、交换机收到消息, ack=true
     *     1.3、cause null
     * 2、发消息 交换机接收失败了,回调
     *     2.1、correlationData 保存回调消息的ID及相关信息
     *     2.2、交换机收到消息, ack=true
     *     2.3、cause 失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack){
            log.info("交换机已经收到id为:{}的消息",id);
        }else {
            log.info("交换机还没收到id为:{}的消息,由于原因:{}",id,cause);
        }
    }
}

三、交换机确认

回调方法里面需要的参数,在生产者里面

1、生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //新建回调方法参数类CorrelationData
        CorrelationData correlationData = new CorrelationData("1");
        //发送消息
        rabbitTemplate.convertAndSend(
                ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,
                message,correlationData);
        log.info("发送消息内容",message);
        System.out.println("发送消息内容"+ message);
    }
}

2、配置文件

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

//生产者发送消息的确认类型是 交换机模式,开启交换机确认回调
spring.rabbitmq.publisher-confirm-type=correlated

3、配置文件类型详解

配置文件中关于 开启交换机确认回调方法 的解释

spring.rabbitmq.publisher-confirm-type=correlated

? NONE
禁用发布确认模式,是默认值

? CORRELATED(推荐)
发布消息成功到交换器后会触发回调方法

? SIMPLE
有两种效果
第一种效果和 CORRELATED 值一样,会触发回调方法
第二种效果在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或waitForConfirmsOrDie 方法,等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

4、测试(生产者故意写错交换机)

http://localhost:8080/confirm/sendMessage/hello

发送消息内容hello
交换机已经收到id为:1的消息
接收到的队列消息hello

如果生产者发送的交换机名称不正确

		//生产者发送消息,交换机名称不正确
        rabbitTemplate.convertAndSend(
                ConfirmConfig.CONFIRM_EXCHANGE_NAME+"1",
                ConfirmConfig.CONFIRM_ROUTING_KEY,
                message,correlationData);

返回报错结果

发送消息内容hello
交换机还没收到id为:1的消息,由于原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'confirm_exchange1' in vhost '/'
接收到的队列消息hello

四、回退消息

1、配置文件

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#生产者发送消息的确认类型是 交换机确认消息
spring.rabbitmq.publisher-confirm-type=correlated

#只要消费者没有接收到生产者的消息,就会回退给生产者
spring.rabbitmq.publisher-returns=true

2、回调方法

package com.example.demo.c;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 实现 rabbit回调接口
 *
 * @PostConstruct该注解被用来修饰一个非静态的void()方法
 * 执行顺序
 * Constructor(构造方法) -> @Autowired(依赖注入) -> @PostConstruct(注释的方法)
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //将本类 MyCallBack 注入到 RabbitTemplate.ConfirmCallback 接口里面
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机不管是否收到信息都会用回调方法
     * 1、发消息 交换机接收到了, 回调
     *     1.1、correlationData 保存回调消息的ID及相关信息
     *     1.2、交换机收到消息, ack=true
     *     1.3、cause null
     * 2、发消息 交换机接收失败了,回调
     *     2.1、correlationData 保存回调消息的ID及相关信息
     *     2.2、交换机收到消息, ack=true
     *     2.3、cause 失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack){
            log.info("交换机已经收到id为:{}的消息",id);
        }else {
            log.info("交换机还没收到id为:{}的消息,由于原因:{}",id,cause);
        }
    }

    /**
     * down 下来源代码后,参数就有名字了,方便理解
     * 回退消息例子是 交换机收到消息,但是队列没有收到消息,消费者无法接收到
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }

}

3、生产者故意写错 routingKey

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //新建回调方法参数类CorrelationData
        CorrelationData correlationData = new CorrelationData("1");
        //发送消息
        rabbitTemplate.convertAndSend(
                ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"1",
                message,correlationData);
        log.info("发送消息内容",message);
        System.out.println("发送消息内容"+ message);
    }
}

4、测试

http://localhost:8080/confirm/sendMessage/hello

发送消息内容hello
消 息 hello, 被交换机 confirm_exchange 退回,退回原因 :NO_ROUTE, 路 由 key:key11
交换机已经收到id为:1的消息

五、备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息
无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然
后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者
所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增
加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。如果既不想丢失消息,又不想增加生产者的
复杂性,该怎么做呢?前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些
处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。什么是备份交换机呢?备份
交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,
就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由
备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑
定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都
进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警

在这里插入图片描述

1、配置类

package com.example.demo.c;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置类,发布确认
 */
@Configuration
public class ConfirmConfig {

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "key1";

    //--------------------------------------------------
    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //报警队列
    public static final String WARNING_QUEUE_NAME = "warning_queue";

    /**
     * 声明交换机
     * 确认交换机将无法发送的消息 转发给 备份交换机
     */
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return ExchangeBuilder
                .directExchange(BACKUP_EXCHANGE_NAME)
                .durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME)
                .build();
    }

    //声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    //交换机通过routingKey 绑定队列
    @Bean
    public Binding queueBindingExchange(
            @Qualifier("confirmExchange") DirectExchange confirmExchange,
            @Qualifier("confirmQueue") Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    //-----------------------------备份交换机------------------------------------

    /**
     * fanout 交换机: 只能统一发送给所有的队列,没有 routingKey,默认为空串
     * direct 交换机: 只能单独的通过 routingKey  绑定到指定的 队列
     * topic 交换机:等于 fanout 交换机 + direct 交换机
     * @return
     */
    //声明备份交换机(fanout扇出交换机)
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    //声明备份队列
    @Bean("backupQueue")
    public Queue backupQueue(){
        return new Queue(BACKUP_QUEUE_NAME);
    }
    //声明报警队列
    @Bean("warningQueue")
    public Queue warningQueue(){
        return new Queue(WARNING_QUEUE_NAME);
    }
    //备份队列绑定到备份交换机
    @Bean
    public Binding BackupQueueBindingBackupExchange(
            @Qualifier("backupExchange") FanoutExchange backupExchange,
            @Qualifier("backupQueue") Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }
    //报警队列绑定到备份交换机
    @Bean
    public Binding WarningQueueBindingBackupExchange(
            @Qualifier("backupExchange") FanoutExchange backupExchange,
            @Qualifier("warningQueue") Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }

}

2、消费者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class WarningConsumer {

    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message){
        String msg = new String(message.getBody());
        log.info("报警发现不可路由消息: {}",msg);
    }
    
}

注意:原来的交换机要删除,因为原来的交换机增加了 可路由到 备份交换机的功能

3、生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //新建回调方法参数类CorrelationData
        CorrelationData correlationData = new CorrelationData("1");
        //发送消息
        rabbitTemplate.convertAndSend(
                ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY,
                message + "key1",
                correlationData);
        log.info("发送消息内容:{}",message + "key1");

        //新建回调方法参数类CorrelationData
        CorrelationData correlationData2 = new CorrelationData("2");
        //发送消息
        rabbitTemplate.convertAndSend(
                ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY + "2",
                message + "key12",
                correlationData2);
        log.info("发送消息内容:{}",message + "key12");

    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:44:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 19:19:01-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码