IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMQ——死信队列 -> 正文阅读

[Java知识库]RabbitMQ——死信队列

1. 死信队列

1.1 概念

死信就是无法被消费的消息,消费者从队列中取消息时,由于某些特定原因导致消息无法被消费,即没有了后续的处理,就变成了死信继而有了死信队列。

1.2 应用场景

可以保证消息不会消失,如果消费者在进行消费时发送异常,可以先放到死信队列中,等后面运行环境好了之后再进行消费。

1.3 死信来源

  1. 消息TTL过期;
  2. 队列达到最大长度;
  3. 消息被拒绝。

1.4 死信实战

架构图:

  1. 正常情况下使用normal交换机绑定给normal队列C1进行消费;
  2. 出现异常情况就转发给 dead交换机绑定给dead队列让C2进行消费。
    在这里插入图片描述

1.4.1 消息TTL过期

测试过程:

  1. 开启消费者1构建交换机队列RoutingKey之间的关系;
  2. 然后关闭消费者1;
  3. 开启生产者发送消息,等待十秒之后消息过期就会到达死信队列;
  4. 然后启动消费者2接收死信队列里面的消息。

消费者1:

/**
 * @Description 死信队列消费者1 接收正常状态下的消息;异常情况下发送消息到死信队列。
 * @date 2022/3/9 10:41
 */
public class Consumer1 {


    // 队列名称
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String DEAD_QUEUE = "dead_queue";

    // 交换机名称
    private static final String NORMAL_EXCHANGE = "e_ch";
    private static final String DEAD_EXCHANGE = "d_ch";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明队列和交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        Map<String ,Object> arguments = new HashMap<>();
        // 设置过期时间 10秒 ; 换做在发送方指定。
//        arguments.put("x-message-ttl",10000);
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // 设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");

        // 创建队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false
                ,arguments);
        // 绑定RoutingKey
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

        // 创建死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("C1等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };

        channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag ->{});
    }

}

消费者2:

/**
 * @Description 死信队列消费者2 接收死信队列里面的消息。
 * @date 2022/3/9 10:41
 */
public class Consumer2 {

    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        System.out.println("C2等待接收消息...");
        DeliverCallback deliverCallback = (consumerTag, message) ->{
            System.out.println("C2接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        channel.basicConsume(DEAD_QUEUE,true, deliverCallback, consumerTag ->{});
    }

}

生产者:

/**
 * @Description 死信队列生产者
 * @date 2022/3/9 10:57
 */
public class Producer {

    private static final String NORMAL_EXCHANGE = "e_ch";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 声明交换机。
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 设置TTL时间存活时间 10 秒
        AMQP.BasicProperties properties =
                new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 0; i < 11; i++) {
            String msg = "message" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送成功:" + msg);
        }
    }
}

1.4.2 队列达到最大长度

修改消费者1,指定最多长度为6条消息,超过这6条消息之后的消息会进入到死信中。
消费者1给村参数的map新增一条:

arguments.put("x-max-length",6);

删除生产者中对队列消息事件的限制。
如果出现406错误就是已经存在队列,可以删除原有队列或者修改一个新名字。

1.4.3 消息被拒

修改消费者1,拒绝最后一位为双数的消息。

消费者1:

/**
 * @Description 死信队列消费者1 接收正常状态下的消息;异常情况下发送消息到死信队列。
 * @date 2022/3/9 10:41
 */
public class Consumer1 {


    // 队列名称
    private static final String NORMAL_QUEUE = "normal_queue1";
    private static final String DEAD_QUEUE = "dead_queue1";

    // 交换机名称
    private static final String NORMAL_EXCHANGE = "e_ch1";
    private static final String DEAD_EXCHANGE = "d_ch1";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        // 声明队列和交换机类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        Map<String ,Object> arguments = new HashMap<>();
        // 设置死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        // 设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");

        // 创建队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false
                ,arguments);
        // 绑定RoutingKey
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

        // 创建死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        System.out.println("C1等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) ->{
            // 拒绝最后为单数的消息
            String msg = new String(message.getBody(), StandardCharsets.UTF_8);
            if (Integer.parseInt(msg.substring(msg.length()-1)) % 2 == 0){
                System.out.println("C1拒绝的消息:" + msg +"----------===========-----------");
                // 拒绝消息
                /**
                 * basicReject
                 *          1. 消息tag表示;
                 *          2. 是否再放回队列中(不放就自动到死信队列中)。
                 */
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else{
                System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        };
        // 开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});
    }

}

先启动消费者1创建对应关系,随之启动消费者2,再启动生产者。

生产者发送:
在这里插入图片描述
消费者1:
在这里插入图片描述
消费者2:
在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-10 22:17:51  更:2022-03-10 22:18:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:37:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码