IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Rabbitmq——备份交换机 -> 正文阅读

[Java知识库]Rabbitmq——备份交换机

? ? ?前边讲解的死信交换机可以用来存储那些处理失败的消息,如果消息路由不通,无法进入队列,则死信交换机无法处理到,生产者通过发布高级确认可以感知消息是否投递成功,但只能通过手动处理路由不通的消息,这样在生产者部署在多台服务器时,手动处理容易出错。

? ? 备份交换机:为一个普通交换机添加一个”备胎“,当交换机接收到一条无法路由的消息时,就会把消息转发给备份交换机,有备份交换机来转发和处理,该交换机类型一般为”fanout“型,这样就可以把所有消息发送到与其绑定队列中去。

案例:一个生产者发送一条路由不通的消息,看看备份交换机如何处理,如下图:

1、配置文件信息,连接rabbitmq服务,同时开启发布确认高级和消息回退。


spring.rabbitmq.host=192.168.22.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=user
spring.rabbitmq.password=123435
#发布消息成功到交换机后会触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
#路由不通,回退消息给生产者
spring.rabbitmq.publisher-returns=true

2、声明交换机与队列,以及绑定对应交换机与队列

普通交换机:

confirm_exchange交换机(direct类型)——路由confirm——confirm_queue

备份交换机:

backup_exchange交换机(fanous类型)——warning_queue

backup_exchange交换机(fanous类型)——backup_queue

指定普通交换机的备份交换机:

通过设置alternate_exchange参数指定备份交换机

confirm_exchange——备份交换机——backup_exchange

@Configuration
public class ConfirmConfigs {

    //交换机,队列,路由名
    public final static String CONFIRM_EXCHANGE="confirm_exchange";
    public final static String CONFIRM_QUEUE="confirm_queue";
    public final static String CONFIRM_ROUTING_KEY="confirm";

    //备份队列,交换机,报警队列
    public final static  String BACKUP_EXCHANGE="backup_exchange";
    public final static  String BACKUP_QUEUE="backup_queue";
    public final static  String WARNING_QUEUE="warning_queue";

    //声明交普通换机,队列
    @Bean
    public DirectExchange confirmExchange(){
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true)
                //普通交换机声明其备份交换机
                .withArgument("alternate-exchange",BACKUP_EXCHANGE).build();
    }
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //绑定交换机和队列
    @Bean
    public Binding queueBindExchange(@Qualifier("confirmExchange") DirectExchange exchange,
                                     @Qualifier("confirmQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }

    //备份交换机,备份队列,报警队列声明即绑定
    @Bean
    public FanoutExchange backupExchange(){
        return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE).build();
    }

    @Bean
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE).build();
    }

    @Bean
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE).build();
    }

    @Bean
    public Binding backExchangeBindbackQueue(@Qualifier("backupExchange") FanoutExchange fanoutExchange,
                                             @Qualifier("backupQueue") Queue queue){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    @Bean
    public Binding warningQueueBindBackQueue(@Qualifier("backupExchange") FanoutExchange exchange,
                                             @Qualifier("warningQueue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange);
    }
}

3、添加组件,继承消息确认回调接口和消息回退接口

confirm(消息id,是否被交换机接收,失败原因)方法,确定消息是否被交换机接收。

returnMessage(消息体,失败编码,失败原因,交换机,路由)方法,回退消息给生产者。

@Component
@Slf4j
public class MyCallBacks implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
   @Autowired
   private RabbitTemplate rabbitTemplate;

    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id=correlationData!=null?correlationData.getId():"";
        if (b){
            //消息成功发送
            log.info("交换机已经收到id为:{}的消息",id);
        }else {
            //消息发送失败
            log.info("交换机未收到id为:{}的消息,收不打原因为:{}",id,s);
        }
    }

    @Override
    public void returnedMessage(Message message, int replayCode, String replayText, String exchange, String routingKey) {
        log.info("消息:{},被{}交换机退回,路由为:{},被退回原因为:{}",new String(message.getBody()),exchange,routingKey,replayText);
    }
}

4、生产者发送消息

convertAndSend(交换机,路由,消息,消息编号即相关信息)方法发送消息


@Slf4j
@RestController
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送消息
    @RequestMapping("/producer/{message}/{routing}/{id}")
    public void send(@PathVariable String message,
                       @PathVariable String routing,
                       @PathVariable String id){
        //存放消息id和相关信息
        CorrelationData correlationData = new CorrelationData(id);

        rabbitTemplate.convertAndSend(ConfirmConfigs.CONFIRM_EXCHANGE,
                routing, message,correlationData);

        log.info("生产者发送消息内容为:'{}',消息id为:{},发送路由为:{}",message,id,routing);
    }
}

?5、报警消费者,普通消费者,备份消费者

报警消费者:用于提醒生产者有消息路由不通

备份消费者:处理路由不通消息

@Component
@Slf4j
public class Consumers {

    //普通消费者
    @RabbitListener(queues = ConfirmConfigs.CONFIRM_QUEUE)
    public void consumer(Message message){
        log.info("consumer收到消息:'{}',路由为:{}",
                new String(message.getBody()), message.getMessageProperties().getReceivedRoutingKey());
    }

    //报警消费者
    @RabbitListener(queues = ConfirmConfigs.WARNING_QUEUE)
    public void warningConsumer(Message message){
        log.info("发现不可路由的消息:'{}'",new String(message.getBody()));
    }

    //备份消费者
    @RabbitListener(queues = ConfirmConfigs.BACKUP_QUEUE)
    public void backupConsumer(Message message){
        log.info("备份消费者已经收到不可路由消息:'{}',马上进行处理",new String(message.getBody()));
    }

}

6、测试,发送路由为:confirms的消息:记得吃饭哦,结果如下:

?

报警消费者发现消息路由不通,备份消费者进行处理,测试成功?

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-21 20:35:37  更:2022-03-21 20:36:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 9:19:14-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码