IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ学习总结(中)——发布确认、交换机和死信队列 -> 正文阅读

[大数据]RabbitMQ学习总结(中)——发布确认、交换机和死信队列

一、发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

开启发布确认的方法:

发布确认默认是没有开启的,如果要在发布者中开启需要调用方法confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法

//开启发布确认
channel.confirmSelect();

1.1 单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

public class ConfirmMessage {

    //批量发消息的个数
    public static int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //1、单个确认
        publishMessageSingle();//发布1000个单独确认消息,耗时874秒
    }

    public static void publishMessageSingle() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量发消息
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());
            //单个消息就马上进行确认
            boolean flag = channel.waitForConfirms();
            if(flag) {
                System.out.println("消息发送成功");
            }
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end-begin) + "秒");
    }
}

执行结果:
在这里插入图片描述

1.2 批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

代码示例:

public class ConfirmMessage {

    //批量发消息的个数
    public static int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //2、批量确认
        publishMessageBatch();//发布1000个批量确认消息,耗时154秒
    }

    //批量发布确认
    public static void publishMessageBatch() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        //批量确认消息大小
        int batchSize = 100;
        //批量发消息,批量发布确认
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());

            //判断达到100条消息的时候,批量确认一次
            if(i % batchSize == 0){
                boolean flag = channel.waitForConfirms();
                if(flag) {
                    System.out.println("消息发送成功");
                }
            }
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end-begin) + "秒");

    }
}

执行结果:
在这里插入图片描述

1.3 异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。
在这里插入图片描述
如何处理异步未确认消息?

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

public class ConfirmMessage {

    //批量发消息的个数
    public static int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //3、异步批量确认
        publishMessageAsync();//发布1000个异步确认消息,耗时33秒
    }

    //异步发布确认
    public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //队列的声明
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long begin = System.currentTimeMillis();

        /**
         * 线程安全有序的一个哈希表,适用于高并发的情况下
         *  1.轻松的将序号于消息进行关联
         *  2.轻松的批量删除条目,只要给到序号
         *  3.支持高并发
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

        //消息确认成功回调函数
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            //=====2.删除掉已经确认的消息,剩下的就是未确认的消息=====
            if(multiple) {
                ConcurrentNavigableMap<Long, String> confirmd =
                        outstandingConfirms.headMap(deliveryTag);
            } else {
                outstandingConfirms.remove(deliveryTag);
            }

            System.out.println("确认的消息:" + deliveryTag);
        };
        /**
         * 消息确认失败回调函数
         *  1.消息的标识
         *  2.是否为批量确认
         */
        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            //=====3.打印一下未确认的消息都有哪些=====
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:" + message + "未确认的消息tag:" + deliveryTag);
        };
        /**
         * 准备消息的监听器,监听哪些消息成功了,哪些消息失败了
         *  1.消息发送成功的监听对象
         *  2.消息发送失败的监听对象
         */
        channel.addConfirmListener(ackCallback,nackCallback);//异步通知

        for (int i = 1; i <= MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",queueName,null,message.getBytes());

            //======1.此处记录下所有要发送的消息,消息的总和=====
            outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
        }

        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end-begin) + "秒");
    }
}

执行结果:
在这里插入图片描述


总结:

单独发布消息

  • 同步等待确认,简单,但吞吐量非常有限。

批量发布消息

  • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

异步处理:

  • 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

二、交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

发布消息方法:
在这里插入图片描述
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话


临时队列:

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

  • 创建临时队列的方式如下:
    String queueName = channel.queueDeclare().getQueue();
    

创建出来之后长成这样:
在这里插入图片描述


绑定 binding:

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。
在这里插入图片描述

2.1 Fanout exchange(发布/订阅模式)

Fanout exchange又叫发布订阅模式。扇出交换机将消息路由到与其绑定的所有队列,并且路由键将被忽略。如果将N个队列绑定到扇出交换,则将新消息发布到该交换时,会将消息的副本传递到所有N个队列。扇出交换机非常适合消息的广播路由
注意:fanout类型的exchange会把消息推到所有的queue中,所以不需要指定routingkey,指定了也没用

系统中默认有fanout类型的exchange
在这里插入图片描述

实现效果:EmitLog(生产者)发送消息给两个消费者接收并打印接收到的信息
在这里插入图片描述
代码示例:

EmitLog 发送消息给两个消费者接收:

public class EmitLog {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生成这发出消息:" + message);
        }
    }
}

ReceiveLogs02将接收到的消息打印

public class ReceiveLogs01 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 生成一个临时队列,队列的名称是随机的
         * 当消费者断开与队列的连接的时候,队列自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列,其中routingkey(也称之为 binding key)为空字符串,广播模式下路由键将被忽略
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs01等待接收消息,把接收到的消息打印在屏幕上.....");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02控制台打印接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume(queueName,true,deliverCallback,(consumerTag) -> {});
    }
}

ReceiveLogs02将接收到的消息打印在控制台

public class ReceiveLogs02 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 生成一个临时队列,队列的名称是随机的
         * 当消费者断开与队列的连接的时候,队列自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列,其中routingkey(也称之为 binding key)为空字符串,广播模式下路由键将被忽略
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs02等待接收消息,把接收到的消息打印在屏幕上.....");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02控制台打印接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume(queueName,true,deliverCallback,(consumerTag) -> {});
    }
}

效果展示:
在这里插入图片描述

2.2 Direct exchange(路由模式)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routingkey完全一致,才会接收到消息

在这里插入图片描述
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。


多重绑定:

在这里插入图片描述
当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。


实战实现效果:

在这里插入图片描述
c2:绑定disk,routingKey为error
c1:绑定console,routingKey为info、warning

  1. 生产者:

    public class DirectLogs {
        //交换机的名称
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //创建多个 bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info", "普通 info 信息");
            bindingKeyMap.put("warning", "警告 warning 信息");
            bindingKeyMap.put("error", "错误 error 信息");
            //debug 没有消费这接收这个消息 所有就丢失了
            bindingKeyMap.put("debug", "调试 debug 信息");
    
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                //获取 key value
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
    
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
    
  2. 消费者C1:

    public class ReceiveLogsDirect01 {
    
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare("console",false,false,false,null);
    
            //把该临时队列绑定名为EXCHANGE_NAME的交换机, 其中 routingkey(也称之为 binding key)为info
            channel.queueBind("console",EXCHANGE_NAME,"info");
            channel.queueBind("console",EXCHANGE_NAME,"warning");
            System.out.println("ReceiveLogsDirect01等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
                System.out.println("info和warning 消息已经接收:\n" + message);
            };
    
            channel.basicConsume("console",true,deliverCallback,(consumerTag) -> {});
    
        }
    }
    
  3. 消费者C2:

    public class ReceiveLogsDirect02 {
    
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare("disk",false,false,false,null);
    
            //绑定交换机与队列
            channel.queueBind("disk",EXCHANGE_NAME,"error");
            System.out.println("ReceiveLogsDirect02等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
                System.out.println("error 消息已经接收:\n" + message);
            };
    
            channel.basicConsume("disk",true,deliverCallback,(consumerTag) -> {});
    
        }
    }
    
  4. 执行结果:
    在这里插入图片描述

2.3 Topics 模式

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert


Topic的要求:

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的:

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

Topic匹配案例:

下图绑定关系如下:
RabbitMQ-00000046

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串 (*.orange.*)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
    • 第一个单词是 lazy 的多个单词 (lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

例子说明
quick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2

注意:

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct

实战实现效果:实现上面的匹配案例

在这里插入图片描述

  1. 生产者

    public class EmitLogTopic {
        public static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            /**
             * Q1-->绑定的是
             * 中间带 orange 带 3 个单词的字符串(*.orange.*)
             * Q2-->绑定的是
             * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
             * 第一个单词是 lazy 的多个单词(lazy.#)
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
    
            for (Map.Entry<String, String> stringEntry : bindingKeyMap.entrySet()) {
                String routingKey = stringEntry.getKey();
                String message = stringEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
                System.out.println("生产者发送消息:" + message);
            }
        }
    }
    
  2. 消费者C1

    /**
     * 消费者C1,接收中间带 orange 带 3 个单词的字符串 (*.orange.*)
     */
    public class ReceiveLogsTopic01 {
        //交换机的名称
        public static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列
            String queueName = "Q1";
            channel.queueDeclare(queueName,false,false,false,null);
            //绑定交换机和队列
            channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
            System.out.println("Q1等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Q1控制台打印接收到的消息:" + new String(message.getBody()));
                System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
    
            };
            channel.basicConsume(queueName,true,deliverCallback,(consumerTag) ->{});
        }
    }
    
  3. 消费者C2

    /**
     * 消费者C2,接收最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)和第一个单词是 lazy 的多个单词 (lazy.#)
     */
    public class ReceiveLogsTopic02 {
        //交换机的名称
        public static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列
            String queueName = "Q2";
            channel.queueDeclare(queueName,false,false,false,null);
            //绑定交换机和队列
            channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
            channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
            System.out.println("Q2等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Q2控制台打印接收到的消息:" + new String(message.getBody()));
                System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
            };
            channel.basicConsume(queueName,true,deliverCallback,(consumerTag) ->{});
        }
    }
    
  4. 执行结果:
    在这里插入图片描述

三、死信队列

死信队列简介:

  • 死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

  • 应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源:

  • 消息 TTL 过期,TTL是Time To Live的缩写, 也就是生存时间

  • 队列达到最大长度,队列满了,无法再添加数据到 mq 中

  • 消息被拒绝,(basic.reject 或 basic.nack) 并且 requeue=false(手动确认)

死信实战效果:
在这里插入图片描述

3.1 死信之TTl

  1. 消费者C1,用于消费正常队列里的消息

    public class Consumer01 {
    
        //普通交换机名称
        private static final String NORMAL_EXCHANGE = "normal_exchange";
        //死信交换机名称
        private static final String DEAD_EXCHANGE = "dead_exchange";
        //普通队列的名称
        private static final  String NORMAL_QUEUE = "normal_queue";
        //死信队列的名称
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            //声明死信和普通交换机 类型为 direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
    
            //声明普通队列
            Map<String,Object> arguments = new HashMap<>();
    
            //过期时间 10s;可以设置普通队列中消息的过期时间,也可以在生产者发消息的时候指定过期时间
    //        arguments.put("x-message-ttl",10000);
            //正常队列设置死信交换机
            arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //设置死信RoutingKey
            arguments.put("x-dead-letter-routing-key","lisi");
            
            channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
    
            //声明死信队列
            channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
    
            //绑定普通的交换机与普通的队列:队列、交换机、路由键(routingKey)
            channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
            //绑定死信的交换机与死信的队列
            channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
    
            System.out.println("等待接收消息........... ");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer01 接收到消息" + message);
            };
    
            channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,(consumerTag) ->{});
        }
    }
    
  2. 消费者C2,用于消费死信队列里的消息

    public class Consumer02 {
        //死信队列的名称
        private static final String DEAD_QUEUE = "dead_queue";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            System.out.println("等待接收消息........... ");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer02 接收到消息" + message);
            };
    
            channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag) ->{});
        }
    }
    
  3. 生产者

    public class Producer {
        //普通交换机名称
        private static final String NORMAL_EXCHANGE = "normal_exchange";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
    
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            //设置消息的 TTL 时间 10s
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            //该信息是用作演示队列个数限制
            for (int i = 1; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
                System.out.println("生产者发送消息:" + message);
            }
        }
    }
    
  4. 运行测试:启动 C1 ,之后关闭消费者,模拟其接收不到消息。再启动生产者

    在这里插入图片描述
    以上步骤完成后,启动 C2 消费者,它消费死信队列里面的消息
    在这里插入图片描述

3.2 死信之最大长度

  1. 消息生产者代码去掉TTL属性
    在这里插入图片描述

  2. C1消费者中设置队列的最大的长度,C2代码不变
    在这里插入图片描述

    //设置正常队列的长度限制,例如发10个,4个则为死信
    params.put("x-max-length",6);
    
  3. 运行测试

    C1启动之后关闭该消费者 模拟其接收不到消息,之后启动C2消费者
    在这里插入图片描述

3.3 死信之消息被拒

  1. 生产者和C2消费者的代码不变,在C1消费中设置拒收消息 "info5"
    在这里插入图片描述

  2. 模拟运行

    启动生产者
    在这里插入图片描述
    启动消费者C1然后再启动消费者C2
    在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:53:50  更:2021-08-06 09:55:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:05:25-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码