IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ -> 正文阅读

[大数据]RabbitMQ

RabbitMQ

各个MQ介绍

ActiveMQ

在这里插入图片描述

kafak

在这里插入图片描述

RocketMQ

在这里插入图片描述

RabbitMQ

在这里插入图片描述

MQ的选择

在这里插入图片描述

RabbitMQ

四大核心概念

在这里插入图片描述
在这里插入图片描述

### 生产者

原理名词解释

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Hello Word

环境准备

maven环境

    <!--指定 jdk 编译版本--> <build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>8</source>
                <target>8</target>
            </configuration>
        </plugin>
    </plugins>
</build>
    <dependencies>
    <!--rabbitmq 依赖客户端-->
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.8.0</version>
    </dependency>
    <!--操作文件流的一个依赖-->
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

生产者

package com.wh;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class HelloWordDemo {
    //队列名称
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //工厂IP,连接RabbitMQ的队列
        connectionFactory.setHost("localhost");
        //用户名
        connectionFactory.setUsername("guest");
        //密码
        connectionFactory.setPassword("guest");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //获取信道
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         *
         * 参数设置
         * 1:队列名称
         * 2:队列里面的消息是否持久化(磁盘)默认情况消息存储到内存中
         * 3:该队列是否只供一个消费者进行消费,是否进行消息共享,true:可以多个消费者消费 false:只能一个消费者消费
         * 4:是否自动删除  最后一个消费者断开连接以后,该队列是否自动删除 true:自动删除  false:不自动删除
         * 5:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发消息
        String message="hello word";
        /**
         * 发送一个消息
         *
         * 参数:
         * 1:发送到那个交换机
         * 2:路由的key值是哪个 本次是队列的名称
         * 3:其他参数信息
         * 4:发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("发送消息完成");
    }
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者

package com.wh.helloword;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Customer {
    //队列名称
    public static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //工厂IP,连接RabbitMQ的队列
        connectionFactory.setHost("localhost");
        //用户名
        connectionFactory.setUsername("guest");
        //密码
        connectionFactory.setPassword("guest");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建信道
        Channel channel = connection.createChannel();

        //声明  接收消息
        DeliverCallback deliverCallback=(customerTag,message)->{
            System.out.println("未成功消费的回调==>"+new String(message.getBody()));
        };
        //声明  取消消费的回调
        CancelCallback cancelCallback=(customerTag)->{
            System.out.println("取消消费的回调==>消息消费中断");
        };

        /**
         * 消费之消费消息
         *
         * 参数:
         * 1:消费哪个队列
         * 2:消费成功后是否要自动应答 true:自动应答  false:手动应答
         * 3:消费者成功消费的回调
         * 4:消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

在这里插入图片描述

Work Queues(工作模式)

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。
相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进
程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
在这里插入图片描述

轮训分发消息

工具类

package com.wh.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 此类为连接工厂创建信道的工具类
 */
public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

消费者

package com.wh.workQueues;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

public class Work01 {
    //队列名称
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明  接收消息
        DeliverCallback deliverCallback=(customerTag, message)->{
            System.out.println("未成功消费的回调==>"+new String(message.getBody()));
        };
        //声明  取消消费的回调
        CancelCallback cancelCallback=(customerTag)->{
            System.out.println("取消消费的回调==>消息消费中断");
        };

        /**
         * 消费之消费消息
         *
         * 参数:
         * 1:消费哪个队列
         * 2:消费成功后是否要自动应答 true:自动应答  false:手动应答
         * 3:消费者成功消费的回调
         * 4:消费者取消消费的回调
         */
        System.out.println("C1等待接收消息");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

生产者

package com.wh.workQueues;

import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.Scanner;

public class Task01 {
    //队列名称
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 生成一个队列
         *
         * 参数设置
         * 1:队列名称
         * 2:队列里面的消息是否持久化(磁盘)默认情况消息存储到内存中
         * 3:该队列是否只供一个消费者进行消费,是否进行消息共享,true:可以多个消费者消费 false:只能一个消费者消费
         * 4:是否自动删除  最后一个消费者断开连接以后,该队列是否自动删除 true:自动删除  false:不自动删除
         * 5:其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Scanner scanner=new Scanner(System.in);
        while (scanner.hasNext()){
            String next = scanner.next();
            /**
             * 发送一个消息
             *
             * 参数:
             * 1:发送到那个交换机
             * 2:路由的key值是哪个 本次是队列的名称
             * 3:其他参数信息
             * 4:发送消息的消息体
             */
            channel.basicPublish("",QUEUE_NAME,null,next.getBytes());
            System.out.println("发送消息完成=>"+next);
        }
    }
}

结果

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:
消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢
失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终
使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用

消息应答方法

A.Channel.basicAck(用于肯定确认)
RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

B.Channel.basicNack(用于否定确认)

C.Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数
不处理该消息了直接拒绝,可以将其丢弃了

Multiple 的解释

建议用false:不批量应答
手动应答的好处是可以批量应答并且减少网络拥堵
在这里插入图片描述
multiple 的 true 和 false 代表不同意思

true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
5-8 的这些还未应答的消息都会被确认收到消息应答

false 同上面相比
只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

在这里插入图片描述
在这里插入图片描述

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

消息手动应答代码

生产者
package com.wh.Ack;

import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.Scanner;

public class Task02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String next = scanner.next();
            channel.basicPublish("", QUEUE_NAME, null, next.getBytes("UTF-8"));
            System.out.println("生产者发出消息");
        }
    }
}

消费者
package com.wh.Ack;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import com.wh.utils.SleepUtils;

public class Work03 {
    private static final String QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C1等待接收消息");
        DeliverCallback deliverCallback=(customerTag,message)->{
           //沉睡1s
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));
            /**
             * 手动应答
             * 参数:
             * 1:消息的标记
             * 2:是否批量应答 false:不批量应答信道中的消息 true:批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };
        //采用手动应答
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(customerTag)->{
            System.out.println("取消消费的回调==>消息消费中断");
        });
    }
}

package com.wh.Ack;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import com.wh.utils.SleepUtils;

public class Work04 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("C2等待接收消息");
        DeliverCallback deliverCallback = (customerTag, message) -> {
            //沉睡1s
            SleepUtils.sleep(30);
            System.out.println("接收到消息:" + new String(message.getBody(), "UTF-8"));
            /**
             * 手动应答
             * 参数:
             * 1:消息的标记
             * 2:是否批量应答 false:不批量应答信道中的消息 true:批量
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (customerTag) -> {
            System.out.println("取消消费的回调==>消息消费中断");
        });
    }
}

手动应答效果演示

在这里插入图片描述
在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是
由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,
此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

持久化

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消
息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列
和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列消息都标记为持久化。

队列的持久化

在这里插入图片描述
在这里插入图片描述

消息的持久化

在这里插入图片描述

不公平分发(消费者端)

在这里插入图片描述
为了避免这种情况,我们可以设置参数 channel.basicQos(1);
在这里插入图片描述
在这里插入图片描述

代码及演示

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

预取值

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

publish/Subscribe(发布确认模式)

在这里插入图片描述

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
在这里插入图片描述

单个确认发布

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。

这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

package com.wh.confirm;

import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.UUID;

/**
 * 发布确认模式
 * 使用的时间 比较那种确认方式是最好的
 * 1: 单个确认模式
 * 2:批量确认
 * 3:异步批量确认
 */
public class ConfirmMessage {
    //批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //1:单个确认模式
        ConfirmMessage.publishMessageIndividually();
        //2:批量确认
        //3:异步批量确认
    }

    //单个确认
    public static void publishMessageIndividually() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        String queue = UUID.randomUUID().toString();
        channel.queueDeclare(queue, true, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long start = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queue, null, message.getBytes());
            //等待确认
            boolean confimOK = channel.waitForConfirms();
            if (confimOK) {
                System.out.println("发布成功");
            }
        }
        //开始时间
        long end = System.currentTimeMillis();
        System.out.println("====发布"+MESSAGE_COUNT+"条消息,共耗时"+(end-start)+"毫秒");
    }
}

在这里插入图片描述

批量确认发布

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

package com.wh.confirm;

import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.UUID;

/**
 * 发布确认模式
 * 使用的时间 比较那种确认方式是最好的
 * 1: 单个确认模式
 * 2:批量确认
 * 3:异步批量确认
 */
public class ConfirmMessage {
    //批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //1:单个确认模式
//        ConfirmMessage.publishMessageIndividually();//====发布1000条消息,共耗时342毫秒
        //2:批量确认
        ConfirmMessage.publishMessageBatch();//====发布1000条消息,共耗时39毫秒
        //3:异步批量确认
    }
    //批量确认
    public static void publishMessageBatch() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        String queue = UUID.randomUUID().toString();
        channel.queueDeclare(queue, false, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long start = System.currentTimeMillis();
        //批量确认消息的大小
        int messageBatch=100;
        //未确认消息的大小
        int outStandingMessageCount= 0;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queue, null, message.getBytes());
            outStandingMessageCount++;
            if(messageBatch == outStandingMessageCount){
                //等待确认
                 channel.waitForConfirms();
                outStandingMessageCount=0;
            }
        }
        //确保若还有剩余消息则可以确认
        if(outStandingMessageCount>0){
            channel.waitForConfirms();
        }
        //开始时间
        long end = System.currentTimeMillis();
        System.out.println("====发布"+MESSAGE_COUNT+"条消息,共耗时"+(end-start)+"毫秒");
    }

}

在这里插入图片描述

异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。
在这里插入图片描述

package com.wh.confirm;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.wh.utils.RabbitMqUtils;

import java.util.UUID;

/**
 * 发布确认模式
 * 使用的时间 比较那种确认方式是最好的
 * 1: 单个确认模式
 * 2:批量确认
 * 3:异步批量确认
 */
public class ConfirmMessage {
    //批量发消息的个数
    public static final int MESSAGE_COUNT = 1000;

    public static void main(String[] args) throws Exception {
        //1:单个确认模式
//        ConfirmMessage.publishMessageIndividually();//====发布1000条消息,共耗时342毫秒
        //2:批量确认
//        ConfirmMessage.publishMessageBatch();//====发布1000条消息,共耗时39毫秒
        //3:异步批量确认
        ConfirmMessage.publishMessageAsync();//====发布1000条消息,共耗时27毫秒
    }
    //异步确认发布
    public static void publishMessageAsync() throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        String queue = UUID.randomUUID().toString();
        channel.queueDeclare(queue, false, false, false, null);
        //开启发布确认
        channel.confirmSelect();
        //开始时间
        long start = System.currentTimeMillis();
        //消息确认成功 --回调函数
        /**
         * deliveryTag:消息的标记
         * multiple:是否批量确认
         */
        ConfirmCallback  ackConfirmCallback=(deliveryTag,multiple)->{

        };
        //消息确认失败 --回调函数
        ConfirmCallback  nackConfirmCallback=(deliveryTag,multiple)->{
            System.out.println("未确认的消息:"+deliveryTag);

        };
        //准备消息的监听器,监听那些成功了,那些失败了
        /**
         * ackConfirmCallback:监听那些成功了
         * nackConfirmCallback:监听那些失败了
         */
        channel.addConfirmListener(ackConfirmCallback,nackConfirmCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String message = i + "";
            channel.basicPublish("", queue, null, message.getBytes());
        }

        //开始时间
        long end = System.currentTimeMillis();
        System.out.println("发布"+MESSAGE_COUNT+"条消息,共耗时"+(end-start)+"毫秒");
    }
}

在这里插入图片描述

如何处理异步未确认消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

交换机

在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为 ”发布/订阅”

Exchanges

Exchanges 概念

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定

Exchanges 的类型

直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)

无名 exchange

在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
在这里插入图片描述
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实
是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话

临时队列

之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除

创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();

绑定

什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
在这里插入图片描述

Fanout(扇出)

Fanout 介绍

Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型
在这里插入图片描述

Fanout 实战

消费者
package com.wh.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

/**
 * @author wanghao
 * @date 2021/7/28 21:44
 */
public class ReceiveLogs01 {
    private static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        /**
         *生成一个临时队列,队列的名称是随机的
         * 当消费者断开和队列的连接时,队列自动删除
         */
        String queue = channel.queueDeclare().getQueue();
        //把临时队列绑定到exchange其中routingkey(也叫binding key)为空字符串
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("等待接收消息,把收到的消息打印到屏幕上");
        DeliverCallback deliverCallback=(customerTag,delivery)->{
            System.out.println("ReceiveLogs01控制台打印接受到的消息:"+new String(delivery.getBody(), "UTF-8"));

        };
        channel.basicConsume(queue,true,deliverCallback,(customerTag)->{});
    }
}


package com.wh.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

/**
 * @author wanghao
 * @date 2021/7/28 21:44
 */
public class ReceiveLogs02 {
    private static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        /**
         *生成一个临时队列,队列的名称是随机的
         * 当消费者断开和队列的连接时,队列自动删除
         */
        String queue = channel.queueDeclare().getQueue();
        //把临时队列绑定到exchange其中routingkey(也叫binding key)为空字符串
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs02等待接收消息,把收到的消息打印到屏幕上");
        DeliverCallback deliverCallback=(customerTag,delivery)->{
            System.out.println("ReceiveLogs02控制台打印接受到的消息:"+new String(delivery.getBody(), "UTF-8"));

        };
        channel.basicConsume(queue,true,deliverCallback,(customerTag)->{});
    }
}

生产者
package com.wh.exchange;

import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.Scanner;

/**
 * @author wanghao
 * @date 2021/7/28 22:02
 */
public class EmitLog {
    private static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String next = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes());
            System.out.println("生产者发出消息:"+next);
        }
    }
}

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Direct(直接)

回顾

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用码:channel.queueBind(queueName,EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。

在这里插入图片描述

消费者

package com.wh.exchange.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

/**
 * @author wanghao
 * @date 2021/7/28 21:44
 */
public class ReceiveLogsDirect01 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("console", false, false, false, null);
        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "wraning");
        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            System.out.println("ReceiveLogsDirect01控制台接收消息:" + new String(delivery.getBody(), "UTF-8"));
        };
        channel.basicConsume("console", true, deliverCallback, (customerTag) -> {
        });
    }
}
package com.wh.exchange.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

/**
 * @author wanghao
 * @date 2021/7/28 21:44
 */
public class ReceiveLogsDirect02 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.queueDeclare("disk", false, false, false, null);
        channel.queueBind("disk", EXCHANGE_NAME, "error");
        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            System.out.println("ReceiveLogsDirect02控制台接收消息:" + new String(delivery.getBody(), "UTF-8"));
        };
        channel.basicConsume("disk", true, deliverCallback, (customerTag) -> {
        });
    }
}

生产者

package com.wh.exchange.direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.Scanner;

/**
 * @author wanghao
 * @date 2021/7/28 22:41
 */
public class DirectLog {
    private static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String next = scanner.next();
            //routindkey 测试三个值:info warning error
            channel.basicPublish(EXCHANGE_NAME,"info",null,next.getBytes());
            System.out.println("生产者发出消息:"+next);
        }
    }
}

测试结果

routind key==info

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

routind key==warning

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

routind key==error

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Topics(主题)

之前类型的问题

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。

尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型。

Topic 的要求

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的

*(星号)可以代替一个单词 #(井号)可以替代零个或多个单词

Topic 匹配案例

下图绑定关系如下

Q1–>绑定的是
--------中间带 orange 带 3 个单词的字符串(.orange.)

Q2–>绑定的是
--------最后一个单词是 rabbit 的 3 个单词(..rabbit)
--------第一个单词是 lazy 的多个单词(lazy.#)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
当队列绑定关系是下列这种情况时需要引起注意

当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了

如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

实战

消费者
package com.wh.exchange.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

public class ReceiveLogsTopic01 {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "Q1";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");

        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收队列 :"+QUEUE_NAME+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, customerTag -> {
        });
    }
}

package com.wh.exchange.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

public class ReceiveLogsTopic02 {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String QUEUE_NAME = "Q2";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");

        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接收队列 :"+QUEUE_NAME+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, customerTag -> {
        });
    }
}

生产者
package com.wh.exchange.topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;

/**
 * @author wanghao
 * @date 2021/7/29 21:47
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        HashMap<String, String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("quick.orange.rabbit"," 被队列 Q1Q2 接收到");
        bindingKeyMap.put("lazy.orange.elephant"," 被队列 Q1Q2 接收到");
        bindingKeyMap.put("quick.orange.fox"," 被队列 Q1 接收到");
        bindingKeyMap.put("lazy.brown.fox"," 被队列 Q2 接收到");
        bindingKeyMap.put("lazy.pink.rabbit"," 虽然满足两个绑定但只被队列 Q2 接收一次");
        bindingKeyMap.put("quick.brown.fox"," 不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit"," 是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit"," 是四个单词但匹配 Q2");
        
        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:"+message);
        }
    }
}
结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

死信队列

死信的概念

在这里插入图片描述

死信来源

消息 TTL 过期

队列达到最大长度(队列满了,无法再添加数据到 mq 中)

消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

死信实战

代码架构图

在这里插入图片描述

死信实战–过期时间

消费者(普通消费者)
package com.wh.dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

import java.util.HashMap;

/**
 * 死信队列--普通消费者声明
 *
 * @author wanghao
 * @date 2021/7/29 23:11
 */
public class Customer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        
        //声明普通队列
        HashMap<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);

        
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        

        //绑定普通交换机和普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信交换机和死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("Consumer01等待接收消息..........");
        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Consumer01 接收到消息" + message);
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (customerTag) -> {
        });
    }
}

生产者
package com.wh.dead;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

/**
 * 死信队列--生产者
 */
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //设置消息的 TTL 时间
        AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message="info:"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:"+message);
        }
    }
}

结果

先启动普通消费者,让交换机,队列被创建并进行绑定。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

再关闭普通消费者后,再启动生产者发消息,模拟消息超时进入私信队列。
在这里插入图片描述

在这里插入图片描述

消费者(死信消费者)
package com.wh.dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

import java.util.HashMap;

/**
 * 死信队列--死信消费者声明
 *
 * @author wanghao
 * @date 2021/7/29 23:11
 */
public class Customer02 {

    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("死信消费者Consumer02等待接收消息..........");
        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("死信消费者Consumer02 接收到消息" + message);
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, (customerTag) -> {
        });
    }
}

结果

在这里插入图片描述

死信实战–队列达到最大长度

生产者

注释掉过期时间即可
在这里插入图片描述

消费者(普通消费者)

限制队列的长度
在这里插入图片描述

结果

启动普通消费者
在这里插入图片描述
删除普通队列后,再启动普通消费者,注册上普通队列
在这里插入图片描述

停掉普通消费者后,生产者发送10条消息,为了不让普通消费者消费消息,让消息能够积压
在这里插入图片描述
在这里插入图片描述

死信实战–消息被拒

消费者(普通消费者)
package com.wh.dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;

import java.util.HashMap;

/**
 * 死信队列--普通消费者声明
 *
 * @author wanghao
 * @date 2021/7/29 23:11
 */
public class Customer01 {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    private static final String NORMAL_QUEUE = "normal_queue";
    private static final String DEAD_EXCHANGE = "dead_exchange";
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明死信和普通交换机 类型为 direct
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        
        //声明普通队列
        HashMap<String, Object> params = new HashMap<>();
        // 正常队列设置死信交换机 参数 key 是固定值
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        // 正常队列设置死信 routing-key 参数 key 是固定值
        params.put("x-dead-letter-routing-key", "lisi");
        //设置正常队列长度的限制
        //params.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);

        
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
        

        //绑定普通交换机和普通队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信交换机和死信队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("Consumer01等待接收消息..........");
        DeliverCallback deliverCallback = (customerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            if(message.equals("info:4")){
                System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
                //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }else {
            System.out.println("Consumer01 接收到消息"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
        };
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (customerTag) -> {
        });
    }
}

生产者
package com.wh.dead;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;

/**
 * 死信队列--生产者
 */
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //设置消息的 TTL 时间
//        AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
        for (int i = 0; i < 10; i++) {
            String message="info:"+i;
//            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes("UTF-8"));
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息:"+message);
        }
    }
}

结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

延迟队列

延迟队列概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列使用场景

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间

单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。

通过队列属性设置,队列中所有消息都具有相同的过期时间

对消息本身进行单独设置,只该条消息具有过期时间

如果同时设置,则消息的TTL以较小的值为准。

消息在队列中的生存时间一旦超过设置的TTL值,就会变成死信(Dead Message。

消息设置 TTL(最好生产者-灵活)

在这里插入图片描述

队列设置 TTL

第一种是在创建队列的时候设置队列的“x-message-ttl”属性
在这里插入图片描述

二者区别

在这里插入图片描述

rabbit整合Springboot

pom

 <dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

swagger

package com.wh.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig(){
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("enjoy6288", "http://atguigu.com",
                        "1551388580@qq.com"))
                .build();
    }
}

application.yml


spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
server:
  port: 8899

队列 TTL

代码架构图

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:
在这里插入图片描述

配置文件类代码

package com.wh.ttlQueue;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * ttl的queue
 */
@Configuration
public class TtlQueueConfig {
    //声明普通交换机
    private static final String X_EXCHANGE = "X";
    //声明死信交换机
    private static final String Y_DEAD_LETTER_EXCHANGE = "Y";

    //声明普通队列
    private static final String QUEUE_A = "QA";
    private static final String QUEUE_B = "QB";
    //声明死信队列
    private static final String DEAD_LETTER_QUEUE = "QD";

    /**
     * 普通交换机
     *
     * @return
     */
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    /**
     * 死信交换机
     *
     * @return
     */
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明普通队列-A 队列ttl为10s
     */
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> arguments=new HashMap<>(4);
        //声明死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //声明死信路由
        arguments.put("x-dead-letter-routing-key","YD");
        //声明队列的过期时间--ms单位
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
    }
    /**
     * 声明普通队列-B 队列ttl为40s
     */
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> arguments=new HashMap<>(4);
        //声明死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //声明死信路由
        arguments.put("x-dead-letter-routing-key","YD");
        //声明队列的过期时间--ms单位
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
    }

    /**
     * 声明死信队列-D
     */
    @Bean("queueD")
    public Queue queueD() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }
    /**
     *  queueA绑定
     */
    @Bean
    public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){
      return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }
    /**
     *  queueB绑定
     */
    @Bean
    public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange){
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    /**
     *  queueD绑定
     */
    @Bean
    public Binding queueBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }
}

消息生产者代码

package com.wh.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/seandMsg/{message}")
    public void sendMsg(@PathVariable("message") String message) {
        log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列"+message);

    }
}

消息消费者代码

package com.wh.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
}

结果

在这里插入图片描述
在这里插入图片描述
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然
后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?

延时队列优化

代码架构图

在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间
在这里插入图片描述

配置文件类代码

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消息生产者代码

@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
    public void sendMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) {
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。

Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题

安装延时队列插件

在这里插入图片描述
执行命令让插件生效
在这里插入图片描述
在这里插入图片描述
重启RabbitMQ后
在这里插入图片描述

情况的不同

延时队列–基于队列的延迟

在这里插入图片描述

延时队列–基于交换机的延迟

在这里插入图片描述

代码架构图

在这里插入图片描述

配置文件类代码

package com.wh.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * 延迟交换机
 */
@Configuration
public class DelayedExchangeConfig {
    private static final String DELAYED_QUEUE_NAME = "delayed_queue";
    private static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
    private static final String DELAYED_ROUTING_KEY = "delayed_routingKey";

    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
    }

    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
        /**
         * 参数
         * 1:交换机的名称
         * 2:交换机的类型
         * 3:是否持久化
         * 4:是否自动删除
         * 5:其他参数
         */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,arguments);
    }

    @Bean
    public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }

}

消息生产者代码

  /**
     * 基于插件发送消息及延迟时间
     * @param message
     * @param delayTime
     */
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable("message") String message, @PathVariable("delayTime") Integer delayTime) {
        log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), delayTime, message);
        rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routingKey", message, msg -> {
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });
    }

消息消费者代码

package com.wh.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@Slf4j
public class DeadLetterQueueConsumer {
    private static final String DELAYED_QUEUE_NAME="delayed_queue";
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
    @RabbitListener(queues = DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message,Channel channel){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟交换机的消息:{}",new Date().toString(),msg);

    }
}

结果

发送请求:
http://localhost:8899/ttl/sendDelayMsg/延迟消息-20000/20000
http://localhost:8899/ttl/sendDelayMsg/延迟消息-2000/2000

在这里插入图片描述

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

发布确认高级

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?
特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:

发布确认 springboot 版本

确认机制方案

在这里插入图片描述
交换机和队列都有可能出错导致接受不到消息
1:交换机出错 (交换机收不到消息)
2:队列出错(交换机收到消息,发给队列出问题)
3:交换机和队列都出错了(也归到交换机收不到消息)

代码架构图

在这里插入图片描述

配置文件

在配置文件当中需要添加
spring.rabbitmq.publisher-confirm-type=correlated
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

添加配置类

package com.wh.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME="confirm.queue";
    public static final String CONFIRM_ROUTING_KEY="key1";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    @Bean
    public Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

消息生产者

package com.wh.controller;

import com.wh.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class SendConfirmController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        //发送无误
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);

        //模拟交换机出错
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"111",
                ConfirmConfig.CONFIRM_ROUTING_KEY, message+"模拟交换机出错", correlationData2);
        //模拟队列出错
        CorrelationData correlationData3 = new CorrelationData();
        correlationData3.setId("3");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"123", message+"模拟队列出错", correlationData3);
    }
}

回调接口

package com.wh.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
    }
    /**
     * 交换机不管是否收到消息的一个回调方法
     * 1:发消息,交换机收到了,回调
     * 1.1 correlationData:保存回调消息的id及相关信息
     * 1.2 交换机收到消息 ack true
     * 1.3 caues null
     * <p>
     * 2:发消息,交换机没收到,回调
     * 2.1 correlationData:保存回调消息的id及相关信息
     * 2.2 交换机收到消息 ack false
     * 2.3 caues 失败的原因
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId() != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }
}

消息消费者

/**
     * 发布确认高级--confirm的消费者
     *
     * @param message
     */
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("接受到队列 confirm.queue 消息:{}", msg);
    }

结果演示

在这里插入图片描述

在这里插入图片描述
第三条消息因为routingkey不对,所以消息被直接丢弃了

回退消息

Mandatory 参数

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

消息生产者代码

package com.wh.controller;

import com.wh.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/confirm")
@Slf4j
public class SendConfirmController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable("message") String message) {
        //发送无误
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("生产者发送消息:{},id为:{}",message,"1");

        //模拟交换机出错
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"111",
                ConfirmConfig.CONFIRM_ROUTING_KEY, message+"模拟交换机出错", correlationData2);
        log.info("生产者发送消息:{},id为:{}",message,"2");

        //模拟队列出错
        CorrelationData correlationData3 = new CorrelationData();
        correlationData3.setId("3");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"123", message+"模拟队列出错", correlationData3);
        log.info("生产者发送消息:{},id为:{}",message,"3");
    }
}

回调接口

package com.wh.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 交换机不管是否收到消息的一个回调方法
     * 1:发消息,交换机收到了,回调
     * 1.1 correlationData:保存回调消息的id及相关信息
     * 1.2 交换机收到消息 ack true
     * 1.3 caues null
     * <p>
     * 2:发消息,交换机没收到,回调
     * 2.1 correlationData:保存回调消息的id及相关信息
     * 2.2 交换机收到消息 ack false
     * 2.3 caues 失败的原因
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData.getId() != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String
            exchange, String routingKey) {
        log.error("  消 息 {}, 机 被 交 换 机 {} 因 退 回 , 退 回 原 因 :{}, 路 由 key:{}",new String(message.getBody()),exchange,replyText,routingKey);
    }
}

在这里插入图片描述
在这里插入图片描述

结果

在这里插入图片描述
在这里插入图片描述

备份交换机

在这里插入图片描述

代码架构图

在这里插入图片描述

修改配置类

package com.wh.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    @Bean("confirmExchange")
    public DirectExchange confirmExchange() {
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
                //是否持久化
                .durable(true)
                //设置该交换机的备份交换机
                .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
    }

    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    @Bean
    public Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    //备份交换机声明
    @Bean("backupExchange")
    public FanoutExchange backupExchange() {
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //备份队列声明
    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    //报警队列声明
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    //声明备份队列绑定关系
    @Bean
    public Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    //声明报警队列绑定关系
    @Bean
    public Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费者

 /**
     * 报警消费者
     */
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.info("报警发现不可路由消息:{}", msg);
    }

生产者

@GetMapping("/sendBackMessage/{message}")
    public void sendBackMessage(@PathVariable("message") String message) {
        //发送无误
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
        log.info("生产者发送消息:{},id为:{}",message,"1");
        
        //模拟队列出错
        CorrelationData correlationData2 = new CorrelationData();
        correlationData2.setId("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"123", message+"模拟队列出错", correlationData2);
        log.info("生产者发送消息:{},id为:{}",message,"2");
    }

结果

在这里插入图片描述

在这里插入图片描述
mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

RabbitMQ 其他知识点

幂等性

概念

在这里插入图片描述

消息重复消费

在这里插入图片描述

解决思路

在这里插入图片描述

消费端的幂等性保障

在这里插入图片描述

唯一 ID+指纹码机制

在这里插入图片描述

Redis 原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

优先级队列

使用场景

在这里插入图片描述
在这里插入图片描述

如何添加

在这里插入图片描述
d.注意事项
要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序

实战

配置类
package com.wh.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author wanghao
 * @date 2021/8/3 22:06
 */
@Configuration
public class PriorityConfig {
    public static final String PRIORITY_EXCHANGE_NAME = " priority.exchange";
    public static final String PRIORITY_QUEUE_NAME = " priority.queue";
    public static final String PRIORITY_ROUTING_KEY = "priority";


    //自定义交换机 我们在这里定义的是一个延迟交换机
    @Bean("priorityExchange")
    public DirectExchange priorityExchange(){
        return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).build();
    }
    @Bean("priorityQueue")
    public Queue priorityQueue(){
        return QueueBuilder.durable(PRIORITY_QUEUE_NAME).maxPriority(10).build();
    }
    @Bean
    public Binding priorityBinding(@Qualifier("priorityQueue")Queue priorityQueue,@Qualifier("priorityExchange")DirectExchange priorityExchange){
        return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PRIORITY_ROUTING_KEY);
    }
}
生产者
package com.wh.controller;

import com.wh.config.PriorityConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author wanghao
 * @date 2021/8/3 22:16
 */
@RestController
@RequestMapping("/priority")
public class SendPriorityController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg")
    public void sendMsg() {
        for (int i = 1; i < 11; i++) {
            int finalI = i;
            String message = "info" + i;
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(i + "");
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setPriority(finalI);
            messageProperties.setDelay(10000);
            rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE_NAME,
                    PriorityConfig.PRIORITY_ROUTING_KEY, MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build(), correlationData);
        }
    }
}

先不启动消费者-发送消息

在这里插入图片描述
消息成功到达Exchange,以为开启了ConfirmCallback
在这里插入图片描述
队列也有了10条
在这里插入图片描述

消费者
 /**
     * 队列优先级消费者
     */
    @RabbitListener(queues = PriorityConfig.PRIORITY_QUEUE_NAME)
    public void receivePriorityQueue(Message message, Channel channel ) throws IOException, InterruptedException {
        String msg = new String(message.getBody());
        if(msg.equals("info5")){
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            log.info("队列优先级消费者拒绝接收:{}",msg);
        }else{
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            log.info("消费者接收到-有队列优先级的消息:{}", msg);
        }
    }
启动消费者-接收消息

在这里插入图片描述
接收的消息按顺序接收

惰性队列

使用场景

在这里插入图片描述

两种模式

在这里插入图片描述

内存开销对比

在这里插入图片描述

springboot整合RabbitMQ发送Json

配置

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQJsonConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

对象

package com.wh.controller;

public class Type {
    private String type;

    private String value;

    public Type() {
    }

    public Type(String type, String value) {
        this.type = type;
        this.value = value;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Type{" +
                "type='" + type + '\'' +
                ", value='" + value + '\'' +
                '}';
    }
}

消费者

    @RabbitListener(queues = PriorityConfig.PRIORITY_QUEUE_NAME)
    public void receivePriorityQueue(Type type, Channel channel ) throws IOException, InterruptedException {
        System.out.println("json接收:"+type);
    }

简单发送

生产者

   @GetMapping("/sendMsg")
    public void sendMsg() {
        Type type = new Type("info", "11");
        String s = JSON.toJSONString(type);
        log.info("s{},{}", 11, s);
        rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE_NAME,
                PriorityConfig.PRIORITY_ROUTING_KEY, type);
}

结果

在这里插入图片描述
在这里插入图片描述

复杂发送

生产者

    @GetMapping("/sendMsg")
    public void sendMsg() {
        for (int i = 1; i < 11; i++) {
            Map<String, String> mqMsg = new HashMap<>();
            int finalI = i;
            mqMsg.put("type", "info");
            mqMsg.put("value", i + "");
            //为了设置CorrelationData 的id ==>confirm 交换机确认接收
            CorrelationData correlationData = new CorrelationData();
            correlationData.setId(i + "");
            //为了设置队列优先级,延迟队列时间等等
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setPriority(finalI);
            messageProperties.setDelay(10000);
            String s = JSON.toJSONString(mqMsg);
            // s1,{"info":"1"}
            log.info("s{},{}", i, s);
            rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE_NAME,
                    PriorityConfig.PRIORITY_ROUTING_KEY, MessageBuilder.withBody(s.getBytes())
                            .andProperties(messageProperties).build(), correlationData);
        }
}

结果

Content-Type:application/octet-stream,从字面意思得知,只可以上传二进制数据
在这里插入图片描述
在这里插入图片描述

友情参考

感谢B站的尚硅谷:
https://www.bilibili.com/video/BV1cb4y1o7zz?p=84

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-06 09:53:50  更:2021-08-06 09:55:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/19 4:41:03-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码