IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> rabbitmq的死信队列 -> 正文阅读

[大数据]rabbitmq的死信队列

死信队列的概念:

用来存放变成死信的消息的队列

死信产生的途径:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
  2. 消息在队列的存活时间超过设置的TTL时间
  3. 消息队列的消息数量已经超过最大队列长度

如何配置死信队列

  1. 配置死信队列,配置死信交换机,将死信队列绑定到死信交换机上,指定路由key
  2. 配置业务队列(同时绑定对应的死信交换机和路由key),配置业务交换机,再将业务队列绑定到业务交换机

在需要使用死信的业务队列中配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
有了死信交换机和路由key后,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。
也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,然后死信的产生途径不是客户端发送的,而是由上面说的几种方式产生的。

用法

首先消息消费端改成 消息手动确认

# 开启手动确认消息,这样订阅者必须要手动 ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息发送端:

	String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead_letter_key";

     // 声明死信队列A
     @Bean("deadLetterQueueA")
     public Queue deadLetterQueueA(){
          return new Queue("deadLetterQueueA");
     }

     // 声明死信Exchange
     @Bean("deadLetterExchange")
     public DirectExchange deadLetterExchange(){
          return new DirectExchange("deadLetterExchange");
     }

     // 声明 死信队列A 与 死信交换机 的绑定关系
     @Bean
     public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                       @Qualifier("deadLetterExchange") DirectExchange exchange){
          return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
     }

     // 声明业务Exchange
     @Bean("businessExchange")
     public FanoutExchange businessExchange(){
          return new FanoutExchange("businessExchange");
     }

     // 声明业务队列A
     @Bean("businessQueueA")
     public Queue businessQueueA(){
          Map<String, Object> args = new HashMap<>(2);
          // x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
          args.put("x-dead-letter-exchange", "deadLetterExchange");
          // x-dead-letter-routing-key  这里声明当前队列的死信路由key
          args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
          return QueueBuilder.durable("businessQueueA").withArguments(args).build();
     }

     // 声明 业务队列A 与 业务交换机 的绑定关系
     @Bean
     public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                     @Qualifier("businessExchange") FanoutExchange exchange){
          return BindingBuilder.bind(queue).to(exchange);
     }

消息接收端(消费端)

	// 业务 + 死信队列
    @RabbitListener(queues = "businessQueueA")
    public void receivebusinessQueueA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        if(msg.equals("1")){
            // 业务处理
            System.out.println("收到业务消息:" + msg);
            // 如果已经配置了(默认)自动确认,就不要在手动确认了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }else{
            // 模拟这种场景下要将他放进死信队列
            System.out.println("消息消费异常,放进死信队列");
            // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    @RabbitListener(queues = "deadLetterQueueA")
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

发送消息的接口

	@GetMapping("/send6")
    @ResponseBody
    public String send6(String index){
        // 业务逻辑代码

        // 发送消息到消息队列
        // template.convertAndSend("businessExchange","","1");
        template.convertAndSend("businessExchange","",index);
        return "发送消息到队列成功";
    }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-04 19:57:25  更:2021-07-04 19:57:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/21 11:55:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码