IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ死信队列 -> 正文阅读

[大数据]RabbitMQ死信队列

含义

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有
后续的处理,就变成了死信,有死信自然就有了死信队列。

出现场景

“死信”是 RabbitMQ 中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时 requeue 属性被设置为 false。
  2. 消息在队列的存活时间超过设置的 TTL(消息过期)时间。
  3. 消息队列的消息数量已经超过最大队列长度。

案例一:消息过期

架构

在这里插入图片描述

组件配置

@Configuration
public class RabbitDeadLetterConfig {
    public static final String DEAD_QUEUE_NAME = "dead_queue";
    public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
    public static final String NORMAL_QUEUE_NAME = "normal_queue";
    public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
    public static final String DEAD_ROUTING_KEY = "dead_routing_key";
    public static final String NORMAL_ROUTING_KEY = "normal_routing_key";

    @Bean
    public Queue normalQueue() {
        Map<String, Object> param = new HashMap<>();
        // 正常队列出现死信场景时发送到死信交换机,因此需要在死信队列指定死信交换机和路由信息
        param.put("x-dead--letter-exchange", DEAD_EXCHANGE_NAME);
        param.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        return new Queue(NORMAL_QUEUE_NAME, true, false,false, param);
    }

    @Bean
    public Queue deadQueue() {
        return new Queue(DEAD_QUEUE_NAME);
    }

    @Bean
    public DirectExchange normalExchange() {
        return new DirectExchange(NORMAL_EXCHANGE_NAME);
    }

    @Bean
    public DirectExchange deadExchange() {
        return new DirectExchange(DEAD_EXCHANGE_NAME);
    }

    @Bean
    public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);
    }

    @Bean
    public Binding deadBinding(Queue deadQueue, DirectExchange deadExchange) {
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
    }
}

生产者

@Component
@Slf4j
public class DeadLetterSender {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String msg1 = "dead msg which ttl is 10s";
        String msg2 = "dead msg which ttl is 1000s";
        // 过期时间设置短点,期望其进入死信队列
        rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
                RabbitDeadLetterConfig.NORMAL_ROUTING_KEY, msg1, (message) -> setExpireTime(message, "10000"));
        // 过期时间设置长点,期望正常队列便能接收到
        rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
                RabbitDeadLetterConfig.NORMAL_ROUTING_KEY, msg2, (message) -> setExpireTime(message, "1000000"));
    }

    private Message setExpireTime(Message message, String expireTime) {
        MessageProperties messageProperties = message.getMessageProperties();
        messageProperties.setExpiration(expireTime);
        return message;
    }
}

消费者

正常消费者

@Component
@RabbitListener(queues = RabbitDeadLetterConfig.NORMAL_QUEUE_NAME)
@Slf4j
public class NormalConsumer {
    @RabbitHandler
    public void consume(String msg) {
        log.info("normal consumer received msg:" + msg);
    }
}

死信消费者

@Component
@RabbitListener(queues = RabbitDeadLetterConfig.DEAD_QUEUE_NAME)
@Slf4j
public class DeadConsumer {
    @RabbitHandler
    public void consume(String msg) {
        log.info("dead consumer received msg:" + msg);
    }
}

测试

@GetMapping("/dead")
    public void deadLetter() {
        deadLetterSender.send();
    }

在这里插入图片描述
首先将正常消费者注释掉,启动程序,调用接口触发生产者生产消息。其中一条消息因有过期时间,当它在10s内不能消费时便会过期,此时看死信消费者能否收到消息
在这里插入图片描述
死信消费者成功接收到消息
取消掉正常消费者的注释启动程序,此时另外一条消息被正常消息者消费。
在这里插入图片描述

案例二:队列长度已满

组件配置

重新定义一个队列,将此队列绑定到案例一的交换机,其他组件仍使用案例一的配置

@Bean
    public Queue sizeQueue() {
        Map<String, Object> param = new HashMap<>();
        // 正常队列出现死信场景时发送到死信交换机,因此需要在死信队列指定死信交换机和路由信息
        param.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        param.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        param.put("x-max-length", 5);
        return new Queue(SIZE_DEAD_QUEUE_NAME, true, false,false, param);
    }

    @Bean
    public Binding sizeDeadBinding(Queue sizeQueue, DirectExchange normalExchange) {
        return BindingBuilder.bind(sizeQueue).to(normalExchange).with(SIZE_ROUTING_KEY);
    }

生产者

@Component
@Slf4j
public class SizeDeadLetterSender {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        for (int i = 1; i < 11; i++) {
            String msg = "size dead msg" + i;
            rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
                    RabbitDeadLetterConfig.SIZE_ROUTING_KEY, msg);
        }
    }
}

消费者

重新定义的队列不进行消费,好使队列长度塞满。

测试

死信队列消费者能收到5条消息,证明这5条是队列塞满后进入死信队列的
在这里插入图片描述
剩余的5条仍在新定义的队列中
在这里插入图片描述

案例三:消息被拒绝

组件配置

同样新增一个队列绑定到案例一的交换机上,其他组件配置不变

    @Bean
    public Queue refusedQueue() {
        Map<String, Object> param = new HashMap<>();
        // 正常队列出现死信场景时发送到死信交换机,因此需要在死信队列指定死信交换机和路由信息
        param.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
        param.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        return new Queue(REFUSED_QUEUE_NAME, true, false,false, param);
    }

    @Bean
    public Binding refusedBinding(Queue refusedQueue, DirectExchange normalExchange) {
        return BindingBuilder.bind(refusedQueue).to(normalExchange).with(REFUSE_ROUTING_KEY);
    }

生产者

发送10条消息到信新队列

@Component
@Slf4j
public class RefusedDeadLetterSender {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send() {
        for (int i = 1; i < 11; i++) {
            String msg = "refuseMsg" + i;
            rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
                    RabbitDeadLetterConfig.REFUSE_ROUTING_KEY, msg);
        }
    }
}

消费者

如果是指定消息拒绝,其他消息正常消费

@Component
@Slf4j
public class RefuseMsgConsumer {
    @RabbitListener(queues = RabbitDeadLetterConfig.REFUSED_QUEUE_NAME)
    public void consume(Message msg, Channel channel) throws IOException {
        String msgStr = new String(msg.getBody());
        if ("refuseMsg10".equals(msgStr)) {
            channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
            log.info("{} refused by refuse consumer", msgStr);
        } else {
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
            log.info("{} consume by refuse consumer", msgStr);
        }
    }
}

特别说明

消费者默认是自动确认的,这里很明显要开启消费者手动确认模式。
在配置文件中添加如下代码:

# 开启消费者手动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual

测试

在这里插入图片描述
指定消息被拒绝进入了死信队列,其余消息被正常消费

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-11 12:28:56  更:2021-08-11 12:30:02 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 20:14:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码