IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ学习笔记 -> 正文阅读

[大数据]RabbitMQ学习笔记

本文基于尚硅谷发布于 B 站的《尚硅谷2021最新版 RabbitMQ 教程丨快速掌握 MQ 消息中间件》教学视频整理而得

视频地址: https://www.bilibili.com/video/BV1cb4y1o7zz?p=1

尚硅谷官网: http://www.atguigu.com/

什么是 MQ

MQ(message queue,消息队列),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常见的上下游【逻辑解耦+物理解耦】的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不用依赖其他服务

简单来说,A 向 B 发送消息,则 A 可以称之为【上游】,B 则理所当然称之为【下游】

为什么使用 MQ

流量消峰

现在有一商品订单服务器,可以接受的并发峰值为 10000 / s,某一时刻,商品促销,并发峰值达到 20000 / s,此时,我们有两种选择

在达到 10000 / s 的峰值时,限制用户下单,用户体验相当不好

放任不管,则用户访问超过峰值,商品订单服务器宕机

以上两种选择不论是对用户还是对企业自身,都没有找到一个比较好的平衡点,现在,MQ 的作用就能体现出来,如图

使用 MQ 作为中间通道,将用户的访问存储在消息队列中作为缓冲,分批次发送给订单服务器进行处理,其【先进先出】的理念,保证了用户访问的先后,最终达到,既不必限制用户访问,也不会造成订单服务器宕机的效果

但此种方式也有一定缺点,因为是分批次处理的,造成用户的等待时间相比正常情况下,较长,但是有舍必有得,通过 MQ 的方式来处理大量并发,也算是找到了用户和服务的良好平衡点,毕竟:等待下单总比没法下单要好得多

应用解耦

依然以电商系统为例,此时,有商品订单服务完成订单创建,需要调用【库存系统(削减库存)】【物流系统(生成物流信息)】【支付系统(生成支付信息)】,若此时,某一子系统出现故障,如库存系统出现故障,需要进行紧急修复,在没有使用 MQ 的情况下,系统之间耦合,直接进行相互调用,因为库存系统的故障,导致整个下单操作异常,影响用户下单

但若是使用到了 MQ,服务之间相互调用时,只需要将请求传递给 MQ,通过 MQ 来向其他服务发送相关请求消息,期间若有服务出现异常,可以将相互的请求消息缓存在 MQ 中,待对应的服务恢复后再进行相关消息的处理,这样,服务之间就进行了解耦,如下图

异步处理

有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api (回调),B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callbackapi 。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息

MQ 的分类

ActiveMQ

优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据

缺点:官方社区现在对ActiveMQ5.x维护越来越少,高吞吐量场景较少使用。尚硅谷官网视频: http://www.gulixueyuan.com/course/322

Kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳

优点: 性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。时效性ms级可用性非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方KafkaWeb管理界面Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用

缺点:Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢

RocketMQ

RocketMQ出自阿里巴巴的开源产品,用Java语言实现,在设计时参考了Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景

优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到0丢失,MQ功能较为完善,还是分布式的,扩展性好,支持10亿级别的消息堆积,不会因为堆积导致性能下降,源码是java我们可以自己阅读源码,定制自己公司的MQ

缺点:支持的客户端语言不多,目前是java及c++,其中c++不成熟;社区活跃度一般,没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

RabbitMQ

2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一

优点:由于erlang语言的高并发特性,性能较好;吞吐量到万级,MQ功能比较完备,健壮、稳定、易用、跨平台、支持多种语言如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高,官网:https://www.rabbitmq.com/news.html

缺点:商业版需要收费,学习成本较高

MQ 的选择

Kafka

Kafka 主要特点是基于 Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了

RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ

RabbitMQ

结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ

RabbitMQ

RabbitMQ 的概念

RabbitMQ是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据

核心概念

生产者

产生数据发送消息的程序是生产者

交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定

队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式

消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者

RabbitMQ 六大模式

上图模式分别为: 简单模式 | 工作模式 | 发布订阅模式 | 路由模式 | 主题模式 | 发布确认模式

工作原理

Broker:接收和分发消息的应用,RabbitMQServer 就是 MessageBroker

Virtualhost:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQserver 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCPConnection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread 创建单独的 channel 进行通讯,AMQPmethod 包含了 channelid 帮助客户端和 messagebroker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立TCPconnection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routingkey,分发消息到 queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routingkey,Binding 信息被保存到exchange 中的查询表中,用于 message 的分发依据

简单总结

消息生产者:

消息生产者通过 Connection 与 Broker 进行连接, Connection 中有多个 Channel,Channel 是 Connection 内部的连接,通常由每个线程单独创建一个 Channel 与 Broker 进行通讯,Channel 之间完全隔离

生产者的消息通过 Connection 到达 Broker 后,首先会遇上 Exchange,Exchange 根据 routingkey 将消息分发到对应的 Queue 中去,Queue 中的消息等待消费者取走

消息消费者:

消费者依然通过 Connection 来连接到 Broker,Broker 则将 Queue 中的数据分发到对应的消费者中去

安装

官网地址:https://www.rabbitmq.com/download.html2

安装 elang 运行环境

rpm -ivh erlang-21.3-1.el7.x86_64.rpm

安装相关网络工具

yum install socat -y

安装 rabbitmq

rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

常用命令

# 执行如下命令,启动 rabbitmq 中的插件管理
rabbitmq-plugins enable rabbitmq_management
	
	出现如下说明启动成功:
    Enabling plugins on node rabbit@localhost:
    rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@localhost...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch

    set 3 plugins.
    Offline change; changes will take effect at broker restart.

# 启停 rabbitmq 的服务
	systemctl start rabbitmq-server
	systemctl restart rabbitmq-server
	systemctl stop rabbitmq-server
	

# 查看服务状态(如下)
systemctl status rabbitmq-server
	rabbitmq-server.service - RabbitMQ broker
	Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
    Active: active (running) since 三 2019-09-25 22:26:35 CST; 7s ago
   Main PID: 2904 (beam.smp)
     Status: "Initialized"
     CGroup: /system.slice/rabbitmq-server.service
             ├─2904 /usr/lib64/erlang/erts-10.4.4/bin/beam.smp -W w -A 64 -MBas ageffcbf -MHas ageffcbf -
             MBlmbcs...
             ├─3220 erl_child_setup 32768
             ├─3243 inet_gethost 4
             └─3244 inet_gethost 4
      .........

默认账号密码

account: guest
password: guest

使用此账号会出现权限问题 User can only log in via localhost

这是因为 rabbitmq 从 3.3.0 开始禁止 使用 guest / guest 权限通过除 localhost 外的访问

只针对于 guest 用户,解决办法为
新建一个用户,授予相关权限

或者

复制配置文件
cp /usr/share/doc/rabbitmq-server-3.7.18/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
修改配置文件
vim /etc/rabbitmq/rabbitmq.config
%%{loopback_users, []},
修改为{loopback_users, []}

添加一个新的用户

rabbitmqctl add_user admin

回车后,会提示设置该用户的密码

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
用户 admin 具有 / 这个 virtualhost 中所有资源的配置 | 写 | 读权限

设置后,就可以使用 admin(用户名) 以及刚刚设置的密码进行 web 端的登陆

查看当前用户和角色

rabbitmqctl list_users

HelloWorld

使用 Java 代码的方式实现简单的【消息生产者】和【消费者】

创建 Maven 工程

导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dhj.demo</groupId>
    <artifactId>rabbitmq-demo</artifactId>
    <version>1.0</version>

    <!--指定 jdk 编译版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>

        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>

        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>
</project>

简单模式

消息生产者将消息放入队列

消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)

应用场景:聊天(中间有一个过度的服务器)

图示

生产者

package com.dhj.demo.hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * mq 消息队列 消息生产者
 */
public class Producer {
    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 发送消息
    public static void main(String[] args) {
        try {// 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();

            // 设置工厂 ip ,用于连接 mq 的队列
            factory.setHost("192.168.116.130");

            // 设置用户名 & 密码
            factory.setUsername("admin");
            factory.setPassword("111111");

            // 创建连接
            Connection connection = factory.newConnection();

            // 创建信道
            Channel channel = connection.createChannel();

            /*
             创建一个队列,设置队列的相关声明
             参数1 队列名称
             参数2 队列中的消息是否持久化,默认情况下,消息存储在内存中(false) | 持久化时,消息存储在磁盘中(true)
             参数3 该队列是否只供一个消费者进行消费(是否消息共享),默认为 false 不进行共享
             参数4 是否自动删除 最后一个消费者断开连接后,该队列是否自动删除,默认为 false 不自动删除
             参数5 其他参数,如 延迟消息,死信消息等,初学,使用 null
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 发消息
            String message = "Hello World!";

            /*
            使用信道发送消息
            参数1 交换机,若没有设置,可以使用 "" 代替,不能使用 null
            参数2 路由key,实际上就是队列名
            参数3 其他参数
            参数4 消息体,参数类型为一个字节数组
             */
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            System.out.println("-----消息发送完毕!-----");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者

package com.dhj.demo.hello;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * mq 消息队列 消费者
 */
public class Consumer {

    // 队列名称,表示接收此队列的消息
    public static final String QUEUE_NAME = "hello";

    // 接受消息
    public static void main(String[] args) {

        try {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();

            // 设置连接参数
            factory.setHost("192.168.116.130");
            factory.setUsername("admin");
            factory.setPassword("111111");

            // 获取连接
            Connection connection = factory.newConnection();

            // 创建信道
            Channel channel = connection.createChannel();

            // 声明未成功消费的回调,使用 lambda 表达式的匿名内部类进行声明,重写其中的 handle 方法
            DeliverCallback deliverCallback = (consumerTag, message) -> {

                /*
                 直接打印消息,也就是被消费者消费的消息
                 参数 message 是一个 Delivery(交付) 对象
                 可以使用 String 构造器,将 message 的消息体转化为字符串,也就是具体的消息字符串
                 */
                System.out.println(new String(message.getBody()));
            };

            // 声明消费者取消消费的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                // 取消消息
                System.out.println("消费者消息消费被取消");
            };

            /*
             消费者接收消息
             参数1 消费那个队列的消息(队列名)
             参数2 消费成功之后,是否要自动应答(true)
             参数3 消费者未成功消费的回调
             参数4 消费者取消消费的回调
             */
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

WorkeQueues

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,简单来说,就是一个生产者有多个消费者消费消息

轮询分发消息

轮询针对的是有多个消费者线程的情况下

消息产生者将消息放入队列,消费者可以有多个:C1、C2 ,二者同时监听同一个队列,C1、C2 共同争抢当前的消息队列内容谁先拿到,谁负责消费消息,当前抢到消息的消费者,下一次需要等待其他消费者进行消费,这是工作队列采用的其中一种分发模式,另一种为【不公平分发】

关于工作队列 https://www.cnblogs.com/wy697495/p/9611648.html

应用场景举例:大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

编写工具类

/**
 * RabbitMQ 工具类
 */
public abstract class RabbitMQUtils {

    // 得到一个连接的 channel
    public static Channel getChannel() {

        try {
            ConnectionFactory factory = new ConnectionFactory();

            factory.setHost("192.168.116.130");
            factory.setUsername("admin");
            factory.setPassword("111111");

            Connection connection = factory.newConnection();

            return connection.createChannel();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    // 简单批量发送指定数量的指定消息
    public static void batchSendMsg(String msg, int msgCount, Channel channel, String queue_name) {
        try {
            for (int i = 0; i < msgCount; i++) {
                channel.basicPublish("", queue_name, null, msg.getBytes());
            }
            System.out.println("批量发送成功!");
        } catch (Exception e) {
            System.out.println("批量发送消息异常!");
            e.printStackTrace();
        }
    }
}

编写常量类

public abstract class RabbitMQConst {
    public static int num = 0;// 用于消息对单个消费者线程消费的消息进行计数

    public static final String QUEUE_NAME_HELLO = "hello";

    // 声明未成功消费的回调,使用 lambda 表达式的匿名内部类进行声明,重写其中的 handle 方法
    public static DeliverCallback deliverCallback = (consumerTag, message) -> {

                /*
                 直接打印消息,也就是被消费者消费的消息
                 参数 message 是一个 Delivery(交付) 对象
                 可以使用 String 构造器,将 message 的消息体转化为字符串,也就是具体的消息字符串
                 */
        num++;
        System.out.println(new String(message.getBody()) + "-" + num);
    };

    // 声明消费者取消消费的回调
    public static CancelCallback cancelCallback = (consumerTag) -> {
        // 取消消息
        System.out.println("消费者消息消费被取消");
    };
}

编写消费者(工作线程)

/**
 * 一个工作线程,相当于消费者 常量
 */
public class Worker {

    // 接收消息
    public static void main(String[] args) {

        try {
            // 获取信道
            Channel channel = RabbitMQUtils.getChannel();

            if (channel != null) {
                channel.basicConsume(RabbitMQConst.QUEUE_NAME_HELLO, true, RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);
                System.out.println("c1 运行...");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行当前 worker ,进行如下配置,便可以再起一个 worker

编写生产者

/**
 * 工作模式 生产者
 */
public class WorkerProduce {
    public static void main(String[] args) {

        try {
            Channel channel = RabbitMQUtils.getChannel();

            if (channel != null) {
                // 创建队列,进行相关申明
                channel.queueDeclare(RabbitMQConst.QUEUE_NAME_HELLO, false, false, false, null);

                RabbitMQUtils.batchSendMsg("Hello", 100, channel, RabbitMQConst.QUEUE_NAME_HELLO);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

首先启动两个 Worker 进程等待消费消息,再启动 WorkerProduce 生产消息,从生产者的代码中可以看出,生产了一百条消息,在控制台查看两个消费者线程消费的情况,各自消费了 50 个消息,且轮询的进行消费(c1 > c2 > c1 > c2 > c1 …),某一个消费者线程进行消费后,下一次就该其他消费者线程消费,多个线程之间相互争抢但不重复消费

消息应答

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。rabbitmq 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉rabbitmq它已经处理了,rabbitmq 可以把该消息删除了

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡

因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

消息应答的方法

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

basic.nack 方法为不确认 deliveryTag 对应的消息,第二个参数是否应用于多消息,第三个参数是否 requeue,与 basic.reject 区别就是同时支持多个消息,可以 nack 该消费者先前接收未 ack 的所有消息。nack 后的消息也会被自己消费到。

Channel.basicReject(用于否定确认)

basic.reject 方法拒绝 deliveryTag(该消息的索引) 对应的消息,第二个参数是否 requeue,true 则重新入队列,否则丢弃或者进入死信队列。

Channel.basicRecover(true)

basic.recover 是否恢复消息到队列,参数为是否 requeue,true 则重新入队列,并且尽可能的将之前 recover 的消息投递给其他消费者消费,而不是自己再次消费。false 则消息会重新被投递给自己

Multiple

multiple 的 true 和 false 代表不同意思

true 代表批量应答 channel 上未应答的消息比如说 channel 上有传送 tag 的消息 5、6、7、8 当前 tag 是 8 那么此时 5-8 的这些只要是还未应答的所有消息都会被确认收到消息应答

false 同上面相比只会应答 tag=8 的消息 5、6、7 这样的其他未被应答的消息不会被确认收到消息应答

批量应答不推荐使用,因为可能存在其他未应答的消息并没有被处理完,但是却被批量应答给处理了,导致消息的丢失

重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息

代码演示

生产者

/**
 * 消息应答 生产者
 */
public class MsgRespProducer {

    public static void main(String[] args) {
        try {

            Channel channel = RabbitMQUtils.getChannel();

            if (channel != null) {
                channel.queueDeclare(RabbitMQConst.QUEUE_NAME_HELLO, false, false, false, null);

                channel.basicPublish("", RabbitMQConst.QUEUE_NAME_HELLO, null, "Java".getBytes());
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者1

/**
 * 不自动批量应答 消费者
 */
public class WorkerNoAuto1 {
    public static void main(String[] args) {
        try {
            Channel channel = RabbitMQUtils.getChannel();

            if (channel != null) {
                System.out.println("c1 运行...");
                // (手动应答模式)声明未成功消费的回调,使用 lambda 表达式的匿名内部类进行声明,重写其中的 handle 方法
                DeliverCallback deliverCallback = (consumerTag, message) -> {

                    // 休眠 1 秒
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(new String(message.getBody()));

                    /*
                    肯定应答
                    参数1 该条消息对应的索引 或者 tag 标记
                    参数2 是否批量应答
                     */
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                };
                
                // 参数2 false 不自动应答
                channel.basicConsume(RabbitMQConst.QUEUE_NAME_HELLO, false, deliverCallback, RabbitMQConst.cancelCallback);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者2

/**
 * 不自动批量应答 消费者
 */
public class WorkerNoAuto2 {
    public static void main(String[] args) {
        try {
            Channel channel = RabbitMQUtils.getChannel();

            if (channel != null) {
                System.out.println("c2 运行...");
                // (手动应答模式)声明未成功消费的回调,使用 lambda 表达式的匿名内部类进行声明,重写其中的 handle 方法
                DeliverCallback deliverCallback = (consumerTag, message) -> {

                    // 休眠 30 秒
                    try {
                        Thread.sleep(30000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(new String(message.getBody()));

                    /*
                    肯定应答
                    参数1 该条消息对应的索引 或者 tag 标记
                    参数2 是否批量应答
                     */
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                };
                
                // 参数2 false 不自动应答
                channel.basicConsume(RabbitMQConst.QUEUE_NAME_HELLO, false, deliverCallback, RabbitMQConst.cancelCallback);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

先启动生产者,发送消息

再先启动消费者2,后启动消费者1,二者都开启了手动应答

现在,消费者2抢到了队列中的消息,但是,由其消费消息的速度过慢,我们可以将该消费者2线程暂时关闭,因为消费者2没有发送 channel.basicAck() 消息确认,所以消息重新入队,此时,重新入队的消息被消费者1监听到并消费

持久化

默认情况下,rabbitmq 服务因为某些原因被意外关闭或重启时,将会忽视队列和消息,为了确保 rabbitmq 服务在关闭时,也不丢失消息 | 队列,需要开启 rabbitmq 的消息 | 队列持久化

队列持久化

之前我们创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把durable 参数设置为持久化

// 参数2 代表是否开启队列的持久化 true(开启)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

注意:如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现如下报错

开启队列持久化后,web 管理页面效果如下

消息持久化

要想让消息实现持久化需要在消息生产者修改代码,添加 MessageProperties.PERSISTENT_TEXT_PLAIN 这个属性

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

但是,消息标记为持久化并不能完全保证不会丢失消息,尽管它告诉 rabbitmq 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了

不公平分发

不论是前面的 WorkerQueue 还是消息应答,使用的都是轮询的消息分发策略,这有一个问题,如果其中的一个或多个工作线程的消息消费策略大于其他的工作线程,而又采取的是轮询的分发策略,导致效率较高的工作线程等待效率较低的工作线程,拖慢了整体的运行效率

为了避免这钟情况,我们可以使用不公平分发,通俗的讲就是:能者多劳

开启不公平分发

/* 
消费者端设置
设置为 0 轮询分发
设置为 1 不公平分发
*/
channel.basicQos(1);

开启后,当消息到达一个工作线程A,工作线程A效率较低,还没有处理完消息或没有进行应答,那么,下一个消息将会到达其他空闲的工作线程,若下下个消息到达后,工作线程A依然没有处理完消息或进行应答,则消息继续发送给其他空闲的工作线程

预取值

rabbitmq 的消息发送是异步的,消费者的手动确认本质上也是异步的,这就说明 channel 中肯定不止一条消息且还未被消费者确认,形成了一个消息的缓冲区,增加了内存的消耗,此时,我们希望限制此缓冲区的大小,这就时使用到了预取值,通过设置预取值的大小,限制消息发送到工作线程时的消息数

如,工作线程A的预取值为 4,若信道中最大的未被确认的消息数也为 4,达到了预取值,rabbitmq 将不会再向工作线程A的信道中发送消息,若现在工作线程A确认一条消息,则 rabbitmq 监听到该 ack 的肯定应答,此时,信道中消息数小于预取值了,rabbitmq 会再向工作现场A发送一条消息

如下图所示,prefetch 代表预取值,

预取值的设置

/* 
消费者端设置
设置为 0 轮询分发
设置为 1 不公平分发
设置的值大于 1 则为预取值
*/
channel.basicQos(2);

发布确认

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出

broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息

发布确认是针对生产者端的,只是在确保消息发布出去后,可以用于消息的持久化或者防止消息的丢失,而不是说等待消费者确认后才发送消息,注意区分,发布确认只针对于生产者消息的发布有以及确认发布成功

开启发布确认

开启发布确认

// 开启发布确认(生产者端)
channel.confirmSelect();

单个确认发布

生产者发送一条消息,那么就等待该消息的确认,若超出一定时间没有收到该消息的确认,则会抛出异常,缺点在于发送速度慢,某一天消息未被确认,会阻塞后面所有的消息,此种方式提供了每秒不超过百条的吞吐量,对于某些应用程序足够了

// 单个确认模式 耗时: 3041ms
public static void singConfirm() throws Exception {
    Channel channel = RabbitMQUtils.getChannel();

    if (channel != null) {
        // 申明队列
        channel.queueDeclare(RabbitMQConst.QUEUE_NAME_HELLO, false, false, false, null);

        // 开启发布确认
        channel.confirmSelect();

        // 记录开始时间
        long start = System.currentTimeMillis();

        // 发送一千条消息
        for (int i = 1; i <= NUM; i++) {
            channel.basicPublish("", RabbitMQConst.QUEUE_NAME_HELLO, null, UUID.randomUUID().toString().getBytes());

            // 单个消息马上进行发布确认
            if (channel.waitForConfirms()) {
                System.out.println("消息 " + i + " 发送成功!");
            }
        }

        // 记录结束时间
        long end = System.currentTimeMillis();
        System.out.println("单个确认模式,消息发送完毕!耗时:" + (end - start));
    }
}

批量发布确认

先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布

    // 批量确认模式 耗时: 768ms
    public static void batchConfirm() throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            // 申明队列
            channel.queueDeclare(RabbitMQConst.QUEUE_NAME_HELLO, false, false, false, null);

            // 开启发布确认
            channel.confirmSelect();

            // 记录开始时间
            long start = System.currentTimeMillis();

            // 发送一千条消息
            for (int i = 1; i <= NUM; i++) {
                channel.basicPublish("", RabbitMQConst.QUEUE_NAME_HELLO, null, UUID.randomUUID().toString().getBytes());

                // 如果达到批量确认的阈值
                if (i % BATCH_NUM == 0) {
                    // 消息发布确认
                    channel.waitForConfirms();
                }
            }

            // 记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("批量确认模式,消息发送完毕!耗时:" + (end - start));
        }
    }

异步发布确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的

在异步模式下,生产者只需要专心发消息,不用管消息是否收到,因为 broker 会异步的调用 ackCallback | nackCallback 方法来回应消息是否发送成功,未发送成功的消息,异步的 nackCallback 在调用时,会带上未发送成功消息的 key(序号) | value(消息内容) 生产者可以根据 k:v 重新发送消息,既保证发送的效率,也保证了消息的不丢失

代码实现

    // 异步确认模式 耗时 600
    public static void asynConfirm() throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            // 申明队列
            channel.queueDeclare(RabbitMQConst.QUEUE_NAME_HELLO, false, false, false, null);

            // 开启发布确认
            channel.confirmSelect();


            // 记录开始时间
            long start = System.currentTimeMillis();


            // 消息肯定回调
            ConfirmCallback ackCallback = new ConfirmCallback() {

                /*
                参数1 消息的标记
                参数2 是否为批量确认
                 */
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("确认成功消息的编号:" + deliveryTag);
                }
            };
            // 确认失败回调
            ConfirmCallback noCallback = new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("确认失败消息的编号: " + deliveryTag);
                }
            };

            /*
            准备消息监听器 监听消息的成功失败
            注:监听为异步的
            参数1 监听成功的消息
            参数2 监听失败的消息
             */
            channel.addConfirmListener(ackCallback, noCallback);

            for (int i = 1; i <= NUM; i++) {
                channel.basicPublish("", RabbitMQConst.QUEUE_NAME_HELLO, null, UUID.randomUUID().toString().getBytes());
            }


            // 记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("异步发布确认模式,消息发送完毕!耗时:" + (end - start));
        }
    }

运行代码,可以观察到如下现象

确认成功消息的编号:256
确认成功消息的编号:257
确认成功消息的编号:259
确认成功消息的编号:260
异步发布确认模式,消息发送完毕!耗时:600
确认成功消息的编号:292
确认成功消息的编号:295
确认成功消息的编号:301
确认成功消息的编号:303

在消息发送完毕的打印语句前后,依然有消息确认的打印提示,这进一步说明了消息确认的异步处理,消息生产者只需要发消息而不用管是否发送成功,由监听器异步的回调返回消息的发送结果

以上只是演示异步确认的现象,而对于未确认的消息,我们需要有一个处理的方式

代码演示

    // 异步确认模式 耗时 600
    public static void asynConfirm() throws Exception {

        // 提供一个高并发哈希表 线程安全有序 用于消息发布和异步确认线程之间的数据通信
        ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();

        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            // 申明队列
            channel.queueDeclare(RabbitMQConst.QUEUE_NAME_HELLO, false, false, false, null);

            // 开启发布确认
            channel.confirmSelect();


            // 记录开始时间
            long start = System.currentTimeMillis();


            // 消息确认回调
            ConfirmCallback ackCallback = new ConfirmCallback() {

                /*
                参数1 消息的序号
                参数2 是否为批量确认
                 */
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {

                    // 如果是一个批量确认的消息
                    if (multiple) {
                        /*
                        按照 deliveryTag 删除当前批量确认消息以及小于等于当前批量确认消息 deliveryTag 的数据
                         */
                        /*
                         headMap() 返回此映射中的部分视图集合,其键值严格小于参数 deliveryTag
                         若 参数2 为 true 则其键值小于或等于 deliveryTag
                         */
                        ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTag, true);

                        // 清空集合中已经被批量确认的消息
                        confirmMap.clear();
                    } else {
                        // 直接删除掉当前已经确认的消息
                        map.remove(deliveryTag);
                    }
                }
            };
            // 确认失败回调
            ConfirmCallback noCallback = new ConfirmCallback() {
                @Override
                public void handle(long deliveryTag, boolean multiple) throws IOException {

                    if (map.isEmpty()) {
                        return;
                    }
                    // 打印确认失败的消息
                    Set<Map.Entry<Long, String>> entries = map.entrySet();
                    for (Map.Entry<Long, String> next : entries) {
                        System.out.println(next.getKey() + ":" + next.getValue());
                    }
                }
            };

            /*
            准备消息监听器 监听消息的成功 | 失败
            注:监听为异步的
            参数1 监听成功的消息
            参数2 监听失败的消息
             */
            channel.addConfirmListener(ackCallback, noCallback);

            for (int i = 1; i <= NUM; i++) {

                String msg = UUID.randomUUID().toString();

                /*
                 记录下所有要发送的消息
                 channel.getNextPublishSeqNo();获取下一次要发布消息的序号
                 */
                map.put(channel.getNextPublishSeqNo(), msg);

                channel.basicPublish("", RabbitMQConst.QUEUE_NAME_HELLO, null, msg.getBytes());
            }

            // 记录结束时间
            long end = System.currentTimeMillis();
            System.out.println("异步发布确认模式,消息发送完毕!耗时:" + (end - start));
        }
    }

以上,首先提供了一个线程安全高并发的 ConcurrentSkipListMap 哈希表,用于在消费者所属线程和异步线程之间进行通信,消费者每次发送消息,都将其存在 ConcurrentSkipListMap 集合中,当确认成功消息,则从集合中删除掉该消息,最终,集合里面剩下的,就是未确认成功的消息,做到了对未处理消息的一个基本处理

三者对比

单独发布消息:同步等待确认,简单,但吞吐量非常有限

批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题

异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些

交换机

交换机初识

在之前的一系列操作中,都是将一个消息发送出来,消费者进行消费,因为一个队列中的消息,只能被消费一次,到导致一个消费者,对于同一个消息,也只能消费一次,此种被称为简单模式或工作模式,如下图

现有一需求,想要将一个消息让给同一个消费者消费两次或多次,此时,就可以使用到交换机,生产者依然只生产一次消息,通过交换机,将该消息发送到两个队列,队列中该消息也依然只出现了一次,但是两个队列都将消息发布到了同一个消费者,实现了同一个消息被一个消费者消费了两次的效果,如下图

交换机的概念(Exchanges)

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange)

交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定

Exchanges 类型

直接类型(direct) | 主题类型(topic) | 标题类型(headers) | 扇出类型(fanout),还有一种【无名类型】通常使用 “” 来进行表示,实际上使用的是默认的 AMQP default 交换机

临时队列

在之前使用的队列中,都是具有特定名称的队列,如 hello

每当连接到 RabbitMQ 服务时,我们都需要一个全新的队列,一旦我们断开连接,队列就被删除,这里,就使用到了临时队列

创建一个临时队列

String queueName = channel.queueDeclare().getQueue();

创建后的队列,如下

绑定(Bindings)

使得某个交换机与某个队列通过 RoutingKey 对应,就叫做绑定

演示

首先创建一个队列

创建一个交换机

交换机与队列绑定

效果如下

Fanout

将接收到的消息广播到所有与扇出类型的交换机绑定的队列,这种现象叫做扇出

代码演示

生产者

public class EmitLog {
    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            for (int i = 0; i < 100; i++) {
                // 发布消息到指定交换机 logs
                channel.basicPublish("logs", "", null, "hello".getBytes());
            }
        }

    }
}

消费者 01

public class ReceiveLogs01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            /*
             声明交换机
             参数1 交换机的名称
             参数2 交换机的类型
             */
            channel.exchangeDeclare("logs", "fanout");

            /*
             声明临时队列
             队列的名称随机
             当消费者断开与队列的连接时 队列自动删除
             */
            String queue = channel.queueDeclare().getQueue();

            /*
            绑定队列与交换机
            参数1 队列名
            参数2 交换机名称
            参数3 routingKey 可以不写 置为 ""
             */
            channel.queueBind(queue, "logs", "");
            System.out.println("ReceiveLogs01 等待接收消息...");

            // 接收消息 为了方便 这里使用自动应答
            channel.basicConsume(queue,true, RabbitMQConst.deliverCallback,RabbitMQConst.cancelCallback);
        }
    }
}

消费者 02

public class ReceiveLogs02 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            channel.exchangeDeclare("logs", "fanout");

            String queue = channel.queueDeclare().getQueue();

            channel.queueBind(queue, "logs", "");
            System.out.println("ReceiveLogs02 等待接收消息...");

            // 接收消息 为了方便 这里使用自动应答
            channel.basicConsume(queue, true, RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);
        }
    }
}

启动两个消费者线程,再启动生产者

观察发现,生产者生产的消息发通过交换机,分别发送给与交换机绑定过的所有消费者

Direct

Fanout 是将消息广播给所有与其绑定过的队列,现在我们希望有个日志系统,只将一些特别的日志,例如严重报错的日志信息存储起来,同时仍然能在控制台打印日志信息

Fanout 这种交换类型并不能给我们带来很大的灵活性,它只能进行无意识的广播,在这里我们将使用 Direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去

多重绑定

在 Direct 模式下,绑定的多个routingKey 一样,这种情况类似于 Fanout 模式,称为多重绑定

代码演示

生产者

/*
direct 交换机 生产者
 */
public class DirectLogs {
    private static final String INFO = "info";
    private static final String ERR = "err";
    private static final String WARNING = "warning";

    public static void main(String[] args) throws Exception {

        Map<String, String> routingKeyMap = new HashMap<>();


        routingKeyMap.put("info", "info信息");
        routingKeyMap.put("err", "err信息");
        routingKeyMap.put("warning", "warning信息");


        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            for (int i = 0; i < 1000; i++) {
                // 为了演示效果 自定义一个规则 向指定的 routingKey 发送消息
                if (System.currentTimeMillis() % 2 == 0) {
                    channel.basicPublish("direct_logs", INFO, null, routingKeyMap.get(INFO).getBytes());
                    channel.basicPublish("direct_logs", WARNING, null, routingKeyMap.get(WARNING).getBytes());
                } else {
                    channel.basicPublish("direct_logs", ERR, null, routingKeyMap.get(ERR).getBytes());
                }
            }
        }
    }
}

消费者1

/*
direct 交换机 消费者01
 */
public class ReceiveLogsDirect01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            /*
            申明直接交换机
            名称为 direct_logs
            使用 BuiltinExchangeType 枚举类选择交换机类型
             */
            channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);

            // 申明队列
            channel.queueDeclare("console", false, false, false, null);

            /*
             该消费者 绑定队列,routingKey 为 info | warning
             */
            channel.queueBind("console", "direct_logs", "info");
            channel.queueBind("console", "direct_logs", "warning");

            // 接收消息
            channel.basicConsume("console", false,
                    RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);

            System.out.println("日志1 接收消息...");
        }
    }
}

消费者2

/*
direct 交换机 消费者02
 */
public class ReceiveLogsDirect02 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            // 申明队列
            channel.queueDeclare("disk", false, false, false, null);

            // 绑定队列 routingKey 为 err
            channel.queueBind("disk", "direct_logs", "err");

            // 接收消息
            channel.basicConsume("disk", false,
                    RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);

            System.out.println("日志2 接收消息...");
        }
    }
}

首先启动消费

在消费者1中,定义了 direct_logs 交换机,定义了 console 队列,并且消费者1通过 routingKey:waring | info 与交换机 direct_logs 和队列 console 进行了绑定

在消费者2中,定义了队列 disk 通过 routingKey:err 与交换机 direct_logs 和队列 disk 进行了绑定

然后启动生产者

在生产者中,直接可以发送消息,因为队列和交换机已经在消费者中定义好了,生产者通过指定交换机,以及在不同的情况下,指定发送消息的 routingKey ,因为消费者与队列,就是通过 routingKey 与队列进行了绑定,而交换机又通过 routingKey 找到对应的队列将消息传递进去,由此,实现了根据条件通过指定不同的 routingKey 将消息发送到不同的消费者

图示

Topic

Fanout 是无选择的对绑定了 routingKey 的队列进行发送消息,Direct 则是根据 routingKey 来有选择性的向绑定的队列发送消息

但若是有这样一种情况,如下图

现在使用的是 direct 交换机,只能针对性的根据 routingKey 分别的发送消息,如果要实现既向 orange 又向 black | green 发送消息(同时路由多个队列),direct 就无法做到,因为他们的 routingKey 不同,代码层面无法实现,虽然我们可以捆绑多个,但是不能保证可以将消息同步的发送到每个捆绑的队列(消费者线程会争夺)

这时,我们就需要使用到 topic

Topic 使用规则

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求

它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:stock.usd.nyse 、 nyse.vmw 、 quick.orange.rabbit 这种类型的,当然这个单词列表最多不能超过 255 个字节

在这个规则列表中,其中有两个替换符是大家需要注意的:* 可以代替一个单词, # 可以替代零个或多个单词

示例

如图

Q1 绑定的是:中间带 orange 带 3 个单词的字符串 (*.orange.*)

Q2 绑定的是:最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit),第一个单词是 lazy 的多个单词 (lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit 被队列 Q1 | Q2 接收到
lazy.orange.elephant 被队列 Q1 | Q2 接收到
quick.orange.fox 被队列 Q1 接收到
lazy.brown.fox 被队列 Q2 接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配 Q2

当队列绑定关系是下列这种情况时需要引起注意

当一个队列绑定键是 # 那么这个队列将接收所有数据,就有点像 fanout 了

如果队列绑定键当中没有 #* 出现,那么该队列绑定类型就是 direct 了

通过如上规则的设置,就能在发送消息时,给定一个 routingKey 匹配到多个队列绑定的 topic 类型交换机中申明的 routingKey 匹配规则,实现一个 routingKey 匹配多个队列向其同步发送消息的效果

代码演示

消费者01

/*
topic 消费者01
 */
public class TopicConsumer01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            /*
            申明交换机 topicExchange ,类型为 TOPIC
             */
            channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC);

            // 申明队列 Q1
            channel.queueDeclare("Q1", false, false, false, null);

            // 通过 routingKey 绑定队列和交换机
            channel.queueBind("Q1", "topicExchange", "*.orange.*");

            // 接收消息
            channel.basicConsume("Q1", true, RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);

            System.out.println("消费者1接收消息...");
        }
    }
}

消费者02

/*
topic 消费者02
 */
public class TopicConsumer02 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            channel.queueDeclare("Q2", false, false, false, null);

            /*
            Q2 绑定了两个 routingKey 匹配规则
             */
            channel.queueBind("Q2", "topicExchange", "*.*.rabbit");
            channel.queueBind("Q2", "topicExchange", "lazy.#");

            // 接收消息
            channel.basicConsume("Q2", true, RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);
            System.out.println("消费者2接收消息...");
        }
    }
}

生产者

/*
Topic 生产者
 */
public class TopicProducer {
    public static void main(String[] args) throws Exception {

        Map<String, String> bindingKeyMap = new HashMap<>();

        bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
        bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
        bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
        bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到");
        bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
        bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");

        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            for (Map.Entry<String, String> entry : bindingKeyMap.entrySet()) {
                channel.basicPublish("topicExchange", entry.getKey(),
                        null, entry.getValue().getBytes());
            }
            System.out.println("消息发送完毕!");
        }
    }
}

运行消费者代码,消费者1 会创建 topicExchange 交换机,申明 Q1 队列,与二者绑定,消费者2 会声明 Q2 队列,与交换机和 Q2 绑定

启动生产者,观察控制台打印,因为两个消费者都通过 topic 模式下的 routingkey 与交换机 topicExchange 绑定,所以有些消息在两个消费者的控制台都出现了,有些消息只在某一个消费者的控制台出现

由此,实现了:通过单个 topic 路由模式下的 routingKey 匹配多个消费者且向多个消费者同时发送消息的效果

死信队列

概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解

一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

死信的来源

消息 TTL(存活时间) 过期

消息队列达到最大长度

消息被拒绝(basic.reject【拒绝应答】 或者 basic.nack【否定应答】) 且 requeue = false【不重新入队】

代码实战

TTL

配置类

/*
统一的配置类 配置队列的申明 交换机的创建 队列绑定等 保持代码的层次分明
 */
public class Config {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            // 创建 normal_exchange 普通交换机 类型为 direct
            channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT);

            // 创建 dead_exchange 死信交换机 类型为 direct
            channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT);

            /*
             申明队列 normal_queue 普通队列
             因为涉及到要将死信转移到死信队列 所以此队列的申明需要参数 参数的类型为 Map<String,Object>
             */
            Map<String, Object> paramMap = new HashMap<>();

            // 设置过期时间 单位:毫秒 ,过期时间推荐在生产者端设置 更为灵活 消费者端设置的过期时间不方便变更
            //paramMap.put("x-message-ttl","10000");

            // 设置死信交换机 键 x-dead-letter-exchange 固定
            paramMap.put("x-dead-letter-exchange", "dead_exchange");
            // 设置死信 routingKey ,键 x-dead-letter-routing-key 固定
            paramMap.put("x-dead-letter-routing-key", "lisi");

            // 申明普通队列 设置 paramMap 参数
            channel.queueDeclare("normal_queue",
                    false, false, false, paramMap);

            // 申明 dead_queue 死信队列
            channel.queueDeclare("dead_queue",
                    false, false, false, null);

            // 通过 routingKey: zhangsan,绑定普通队列和普通交换机
            channel.queueBind("normal_queue", "normal_exchange", "zhangsan");

            // 通过 routingKey: lisi,绑定死信队列和死信交换机
            channel.queueBind("dead_queue", "dead_exchange", "lisi");

            // 配置完毕,参数 0 正常退出
            System.exit(0);
        }
    }
}

生产者

/*
死信队列 生产者
 */
public class DeadProducer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            // 死信消息 设置 TTL 时间(time to live) 10000 毫秒
            AMQP.BasicProperties properties = new
                    AMQP.BasicProperties()
                    // 设置过期时间
                    .builder().expiration("10000").build();
            for (int i = 0; i < 100; i++) {
                channel.basicPublish("normal_exchange", "zhangsan", properties, "hello".getBytes());
            }
        }
    }
}

消费者1

/*
死信队列实战
消费者 1
 */
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            // 消费 normal_queue 队列的消息
            channel.basicConsume("normal_queue", RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);

            System.out.println("消费者1 监听消息...");
        }
    }
}

消费者2

/*
死信队列实战
消费者 2
 */
public class Consumer02 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {
            // 消费 dead_queue 队列的消息
            channel.basicConsume("dead_queue", RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);
            System.out.println("消费者2 监听消息...");
        }
    }
}

通过 Config 配置类,声明了:普通交换机【normal_exchange】| 死信交换机【dead_exchange】| 普通队列【normal_queue】| 死信队列【dead_queue】

在 normal_queue 申明时,通过参数设置了超过 TTL 时绑定的 normal_exchange 和 routingKey 【lisi】

然后将 normal_exchange 和 normal_queue 通过 routingKey【zhangsan】进行绑定,将 dead_exchange 和 dead_queue通过 routingKey【lisi】进行绑定

在消费者01,接收 normal_queue 中的消息,在消费者02中,接收 dead_queue 中的消息

生产者端,发送消息到 normal_queue 设置 TTL 为 10000ms,关闭消费者01(模拟异常或者掉线情况)

当消息发出且超过 TTL 规定的毫秒数时,原本发送到 normal_queue 的消息,通过 normal_queue 绑定的 dead_exchange 和 routingKey【lisi】将消息发送到了 dead_queue 中保存起来【在配置类中,dead_queue 通过 routingKey:lisi 与 dead_exchange 进行了绑定】,若对应的 dead_queue 有消费者,则该消息被消费

以上是 TTL 过期情况下的死信队列演示

队列达到最大长度

当队列中的消息达到规定的最大长度,则【超出部分的消息】会成为死信消息

修改生产者代码

/*
死信队列 生产者
 */
public class DeadProducer {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            // 死信消息 设置 TTL 时间(time to live) 10000 毫秒
//            AMQP.BasicProperties properties = new
//                    AMQP.BasicProperties()
//                    // 设置过期时间
//                    .builder().expiration("10000").build();
            for (int i = 0; i < 100; i++) {
                // TTL
                // channel.basicPublish("normal_exchange", "zhangsan", properties, "hello".getBytes());

                // 队列最大长度
                channel.basicPublish("normal_exchange", "zhangsan", null, "hello".getBytes());
            }
        }
    }
}

修改配置类,在申明 normal_queue 时,添加如下参数

paramMap.put("x-max-length", 6); // 队列最大长度

修改消费者1

/*
死信队列实战
消费者 1
 */
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            Thread.sleep(10000);

            // 消费 normal_queue 队列的消息
            channel.basicConsume("normal_queue", RabbitMQConst.deliverCallback, RabbitMQConst.cancelCallback);
            System.out.println("消费者1 监听消息...");
        }
    }
}

让其监听消息时进行休眠,使得来不及消费消息,造成消息拥堵到 normal_queue 中,当达到 6 条的最大队列数时,通过控制台观察到,剩余的消息被放到了 dead_queue 中,且被消费者2 消费了,生产者发送了 100 条消息,消费者2 消费了 96 条,说明在超出最大队列容量时,超出的 96 条消息会被死信队列接收

消息被拒

修改 Config,去掉最大队列长度的限制

/*
统一的配置类 配置队列的申明 交换机的创建 队列绑定等 保持代码的层次分明
 */
public class Config {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            // 创建 normal_exchange 普通交换机 类型为 direct
            channel.exchangeDeclare("normal_exchange", BuiltinExchangeType.DIRECT);

            // 创建 dead_exchange 死信交换机 类型为 direct
            channel.exchangeDeclare("dead_exchange", BuiltinExchangeType.DIRECT);

            /*
             申明队列 normal_queue 普通队列
             因为涉及到要将死信转移到死信队列 所以此队列的申明需要参数 参数的类型为 Map<String,Object>
             */
            Map<String, Object> paramMap = new HashMap<>();

            // 设置过期时间 单位:毫秒 ,过期时间推荐在生产者端设置 更为灵活 消费者端设置的过期时间不方便变更
            //paramMap.put("x-message-ttl","10000");

            // 设置死信交换机 键 x-dead-letter-exchange 固定
            paramMap.put("x-dead-letter-exchange", "dead_exchange");
            // 设置死信 routingKey ,键 x-dead-letter-routing-key 固定
            paramMap.put("x-dead-letter-routing-key", "lisi");
            // paramMap.put("x-max-length", 6); // 队列最大长度

            // 申明普通队列 设置 paramMap 参数
            channel.queueDeclare("normal_queue",
                    false, false, false, paramMap);

            // 申明 dead_queue 死信队列
            channel.queueDeclare("dead_queue",
                    false, false, false, null);

            // 通过 routingKey: zhangsan,绑定普通队列和普通交换机
            channel.queueBind("normal_queue", "normal_exchange", "zhangsan");

            // 通过 routingKey: lisi,绑定死信队列和死信交换机
            channel.queueBind("dead_queue", "dead_exchange", "lisi");

            // 配置完毕,参数 0 正常退出
            System.exit(0);
        }
    }
}

修改消费者01,在其接收到消息时,拒绝应答或者否定应答

/*
死信队列实战
消费者 1
 */
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();

        if (channel != null) {

            // 拒绝应答消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {

                // requeue 设置为 false 代表拒绝重新入队,该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            };

            // 消费 normal_queue 队列的消息
            channel.basicConsume("normal_queue", deliverCallback, RabbitMQConst.cancelCallback);
            System.out.println("消费者1 监听消息...");
        }
    }
}

启动配置类,启动消费者01,启动消费者02,启动生产者,观察控制台,因为消费者01拒绝了所有的消息,其拒绝的消息被放到的了其绑定死信队列中,消费者02监听了死信队列的消息,将消费者01拒绝的消息进行了消费并打印到了控制台

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列

简单来说,延迟队列中的消息在到达后,不会被马上消费,而是在指定的时间过后才能被消费者消费

但是,延迟队列不是单个意义上的队列,需要将 TTL、死信队列结合起来才能构成一个延时队列

延迟队列使用场景

订单指定时间未支付(10分钟),过期自动取消

在创建订单时,将未支付的订单 id 存储在延迟队列中,设置队列的延时为 10 分钟,到达延时后,对应的消费者接收到队列中的消息,再次检查订单状态,根据订单状态进行对应的业务逻辑处理

新用户注册三天之内未登录,短信提醒

每当一个新用户注册成功,检查其登陆状态,若未登录,将其存入延迟队列,设置延时为三天,到达延时后,对该消息对应的用户进行短信提醒其登陆

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务

RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间

TTL 单位是毫秒,换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL设置的时间内没有被消费,则会成为 死信,如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL

消息设置 TTL,通过传参的方式

AMQP.BasicProperties properties = new
                    AMQP.BasicProperties()
                    // 设置过期时间
                    .builder().expiration("10000").build();
channel.basicPublish("normal_exchange", "zhangsan", properties, "hello".getBytes());

队列设置 TTL,依然通过传参的方式

paramMap.put("x-message-ttl", 5000);
channel.queueDeclare("normal_queue",false, false, false, paramMap);

二者区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列,则被丢到死信队列中)

而设置消息的 TTL,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了

想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息

整合 SpringBoot

创建 SpringBoot 工程,添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/>
    </parent>
    <groupId>com.dhj</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>1.0</version>
    <name>springboot-rabbitmq</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

配置文件

server.port=8888
spring.rabbitmq.host=192.168.116.130
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111

Swagger2 配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig() {
        return new Docket(DocumentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }

    private ApiInfo webApiInfo() {
        return new ApiInfoBuilder().title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("dhj", "www.dhj.com", "18781075421@163.com"
                )).build();
    }
}

延时队列代码实战

编写配置类,在 SpringBoot 中,通过配置类的方式来申明队列 | 交换机 | 绑定关系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {

    private static final String X_EXCHANGE = "X";// 普通交换机名称 补充:ctrl + shift + u 大小写转换
    private static final String Y_EXCHANGE = "Y";// 死信交换机名称
    private static final String QA = "QA";// 普通队列1名称
    private static final String QB = "QB";// 普通队列2名称
    private static final String QD = "QD";// 死信队列名称

    /*
     申明普通交换机 X
     DirectExchange 代表 direct 类型的交换机 Y
     */
    @Bean("X")
    public DirectExchange xDirectExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    // 申明死信交换机 Y
    @Bean("Y")
    public DirectExchange yDirectExchange() {
        return new DirectExchange(Y_EXCHANGE);
    }

    // 申明普通队列QA 设置参数绑定死信交换机
    Map<String, Object> paramMap = new HashMap<>(3);

    @Bean("QA")
    public Queue aQueue() {
        /*
         设置参数
         设置死信交换机
         设置 TTL 10 秒
         */
        paramMap.put("x-dead-letter-exchange", Y_EXCHANGE);
        paramMap.put("x-dead-letter-routing-key", "YD");
        paramMap.put("x-message-ttl", 10000);
        return QueueBuilder
                // 需要持久化的队列的名称
                .durable(QA)
                .withArguments(paramMap)
                .build();
    }

    // 申明普通队列QB 设置参数绑定死信交换机 Y
    @Bean("QB")
    public Queue bQueue() {
        /*
         设置参数
         设置死信交换机
         设置 TTL 5 秒
         */
        // 再次插入向 paramMap 中插入,相同键的值会被更新
        paramMap.put("x-dead-letter-exchange", Y_EXCHANGE);
        paramMap.put("x-dead-letter-routing-key", "YD");
        paramMap.put("x-message-ttl", 5000);
        return QueueBuilder
                // 需要持久化的队列的名称
                .durable(QB)
                .withArguments(paramMap)
                .build();
    }

    // 申明死信队列 无参数
    @Bean("QD")
    public Queue dQueue() {
        return new Queue(QD);
    }

    /*
    QA 绑定关系到交换机 X
    @Qualifier 注入指定名称的 bean
     */
    @Bean
    public Binding aQueueBindingX(@Qualifier("QA") Queue queue
            , @Qualifier("X") DirectExchange xExchange) {

        return BindingBuilder
                .bind(queue).to(xExchange)
                .with("XA");
    }

    /*
    QB 绑定关系到交换机 X
     */
    @Bean
    public Binding bQueueBindingX(@Qualifier("QB") Queue queue
            , @Qualifier("X") DirectExchange xExchange) {

        return BindingBuilder
                .bind(queue).to(xExchange)
                .with("XB");
    }

    /*
    QD 绑定关系到交换机 Y
     */
    @Bean
    public Binding dQueueBindingY(@Qualifier("QD") Queue queue
            , @Qualifier("Y") DirectExchange xExchange) {

        return BindingBuilder
                .bind(queue).to(xExchange)
                .with("YD");
    }
}

编写生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送延迟消息
     */
    @GetMapping("/send/{mesg}")
    public void sendMsg(@PathVariable("mesg") String mesg) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String date = format.format(new Date());

        log.info("当前时间:" + date + ",发送一条信息给两个 TTL 队列:" + mesg);

        // 发送消息 使用 RabbitTemplate
        rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl10 的队列" + mesg);
        rabbitTemplate.convertAndSend("X", "XB", "消息来自 ttl5 的队列" + mesg);
    }
}

编写消费者

package com.dhj.rabbitmqdemo.receiver;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 延迟队列的消费者 实际上也是死信队列
 */
@Slf4j
@Component
public class Consumer {

    // 接收指定队列的消息
    @RabbitListener(queues = "QD")
    public void receiverQD(Message message, Channel channel) throws Exception {

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String date = format.format(new Date());

        log.info("当前时间:" + date + ",收到死信队列消息:" + new String(message.getBody()));
    }
}

启动时报了如下错,是因为在 paramMap.put("x-message-ttl", 10000); 设置 TTL 时,将毫秒数写成了字符串 paramMap.put("x-message-ttl", "10000"); 由报错中的 invalid arg 'x-message-ttl' 可以推断出

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid arg 'x-message-ttl' for queue 'QA' in vhost '/': {unacceptable_type,longstr}, class-id=50, method-id=10)

上述代码,申明了普通交换机 X,普通队列 QA | QB,X 和 QA | QB 之间分别通过 XA | XB 两个 routingKey 绑定,QA | QB 的 TTL 分别为 10s | 5s

QA | QB 两个队列又通过参数设置其与死信交换机 Y 通过 routingKey:YD 绑定,其死信都会通过参数设置的死信交换机 Y 传送到死信队列 QD 中

死信队列 QD 也通过 routingKey:YD 与死信交换机进行了绑定

消费者只监听死信队列中的消息

当通过 url 发送消息,通过交换机传递到 QA | QB 两个队列中,当消息在两个队列中存在时间超过 TTL 没有被消费,会通过死信交换机 Y 被投放到死信队列 QD 中,死信交换机 Y 为 direct 类型,Y 会将消息传递到与其绑定过 routingKey 的队列中去,因此死信队列 QD 最终会收到 TTL 过期的消息,相当于就指定了消息多长时间之后被消费

上述方式实际上是设置的队列的 TTL 属性

以上,就基本实现了一个延迟队列

延迟队列优化

上面的延迟队列的延时有 5s | 10s 两种,若要添加一种延时的情况,就需要再添加一个新的 TTL 队列,若是想要再添加100种,1000种情况呢,是否需要再添加1000个 TTL 队列?,可以看出,这种方式既不优雅也不实际,我们需要对其进行优化

如上图,新增一个不设置 TTL 的队列,通过生产者端来设置 TTL 发送到该队列中,在生产者端动态的决定 TTL 时长,而不是在队列中把 TTL 写死

这种方式实际上是在设置消息的 TTL 属性

修改配置类,添加如下

// 申明普通队列 QC 设置其死信参数
    @Bean("QC")
    public Queue cQueue() {
        Map<String, Object> cParamMap = new HashMap<>(2);
        cParamMap.put("x-dead-letter-exchange", Y_EXCHANGE);
        cParamMap.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder
                // 需要持久化的队列的名称
                .durable("QC")
                .withArguments(cParamMap)
                .build();
    }
/*
    QC 绑定关系到交换机 X
     */
    @Bean
    public Binding cQueueBindingX(@Qualifier("QC") Queue queue
            , @Qualifier("X") DirectExchange xExchange) {

        return BindingBuilder
                .bind(queue).to(xExchange)
                .with("XC");
    }

修改生产者

/**
     * 通过请求在生产者端发送消息并设置 TTL
     */
    @GetMapping("/sendExpiration/{msg}/{ttl}")
    public void sendMsg(@PathVariable("msg") String msg,
                        @PathVariable("ttl") Long ttl) {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String date = format.format(new Date());

        log.info("当前时间:" + date + ",发送一条 TTL 为:" + ttl + " 毫秒的信息给队列,信息为:" + msg);

        rabbitTemplate.convertAndSend("X", "XC", msg, message -> {
            /*
            设置相关参数
            setExpiration 设置过期时间
             */
            message.getMessageProperties().setExpiration(ttl.toString());
            return message;
        });
    }

如上,通过 http://localhost:8888/ttl/sendExpiration/延时15秒/15000 就可以指定设置消息的延迟而不用新增队列

但是,大多数时候,结果总是差强人意,如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

基于插件实现延迟队列

下载插件 http://www.rabbitmq.com/community-plugins.html

将插件复制到 rabbitmq 的 plugins 目录

cp rabbitmq_delayed_message_exchange-3.8.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

进入 plugins 目录,执行安装命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装成功如下

重启 rabbitmq

systemctl restart rabbitmq-server

进入 web 管理页面,在 exchanges 选项中

使用插件后,整个延迟队列的架构也会发生变化

不用通过队列设置 TTL 来延迟消息,交换机就可以直接通过插件来管理消息的 TTL,后面也不需要死信队列,普通队列直接接收延迟消息即可

代码演示

代码架构

配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/*
延迟队列配置类
 */
@Configuration
public class DelayedQueueConfig {
    private static final String delayed_exchange = "delayed.exchange";// 交换机名称
    private static final String delayed_type = "x-delayed-message";// 交换机类型
    private static final String delayed_queue = "delayed.queue";// 队列名称
    private static final String delayed_routing_key = "delayed.key";// routingKey


    // 基于插件申明交换机
    @Bean("delayedX")
    public CustomExchange delayedExchange() {

        Map<String, Object> paramMap = new HashMap<>();

        /*
         延迟的类型
         实际上指的是广播消息的方式
         */
        paramMap.put("x-delayed-type", "direct");
        /*
        参数1 交换机名称
        参数2 交换机类型
        参数3 是否持久化
        参数4 是否自动删除
        参数5 其他参数 Map 集合 可不写
         */
        return new CustomExchange(delayed_exchange, delayed_type, false, false, paramMap);
    }

    /*
    构建队列
     */
    @Bean("delayedQueue")
    public Queue delayedQueue() {
        return new Queue(delayed_queue);
    }

    /*
    队列绑定
     */
    @Bean
    public Binding delayedBind(@Qualifier("delayedX") CustomExchange exchange,
                               @Qualifier("delayedQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(exchange).with(delayed_routing_key).noargs();
    }
}

生产者

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

@RestController
@RequestMapping("/delayed")
@Slf4j
public class DelayedController {

    @Autowired
    RabbitTemplate template;

    @GetMapping("/send/{msg}/{delayedTime}")
    public void sendMsg(@PathVariable("msg") String msg
            , @PathVariable("delayedTime") Long time) {

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        String date = format.format(new Date());
        log.info("当前时间:{},发送延迟消息:{},延迟时长:{}s", date, msg, time / 1000);

        template.convertAndSend("delayed.exchange", "delayed.key",
                msg, message -> {
                    message.getMessageProperties()
                            .setDelay(Integer.parseInt(time.toString()));
                    return message;
                });
    }
}

消费者

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@Slf4j
public class DelayedConsumer {

    // 接收指定队列的消息
    @RabbitListener(queues = "delayed.queue")
    public void receiverQD(Message message, Channel channel) throws Exception {

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String date = format.format(new Date());

        log.info("当前时间:{},收到延迟队列的消息:{}", date, new String(message.getBody()));
    }
}

以上,通过插件就完成了一个较为符合预期的延时队列,解决了安装插件之前,消息的 TTL 被入队的顺序所覆盖

生产者生产消息,设置消息的延时,安装插件后,设置交换机的类型为 x-delayed-message 可以按照每个消息的延时时间来发送消息而不需要设置死信队列来专门接收延迟消息

发布确认高级

之前的提到过的发布确认,是在 RabbitMQ 服务正常的情况下,消息发布出去,能够通过 RabbitMQ 服务进行持久化、发布确认等操作保证消息的正确发送,现在,我们有必要考虑极端情况下如何来进行发布确认

在生产环境中由于一些不明原因,导致 rabbitmq 重启或者宕机,这时,不仅队列不可用,就连交换机也不可用了,整个生产者应用发布出去的消息就此石沉大海,导致了消息的丢失

由此我们也可看出,就算 RabbitMQ 整个服务都失效了,消息丢失的情况,无非也就是在交换机和队列二者之间出现,我们需要在这二者之间来解决极端环境下,消息的丢失问题

代码实现

代码架构图

配置文件

#发布确认的类型
spring.rabbitmq.publisher-confirm-type=correlated

/*
其他值
NONE 禁用发布确认

CORRELATED 发布消息成功到交换器后会触发回调方法

SIMPLE 经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,其二在发布消息成功后使用 rabbitTemplate调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broke
该模式实际上是同步确认消息,效率较低
*/

配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
发布确认高级配置类
 */
@Configuration
public class ConfirmConfig {

    public static final String exchange_name = "confirm.exchange";// 交换机名称
    public static final String routing_key = "key1";// routingKey
    public static final String queue_name = "confirm.queue";// 队列名称

    // 申明交换机
    @Bean("confirmEx")
    public DirectExchange confirmEx() {
        return new DirectExchange(exchange_name);
    }

    // 申明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder
                .durable(queue_name)
                .build();
    }

    // 绑定
    @Bean
    public Binding queueBind(@Qualifier("confirmEx") DirectExchange exchange
            , @Qualifier("confirmQueue") Queue queue) {

        return BindingBuilder.bind(queue).to(exchange).with(routing_key);
    }

}

生产者

    /**
     * 发布确认
     */
    @GetMapping("/sendConfirm/{msg}")
    public void sendConfirm(@PathVariable("msg") String msg) {

        rabbitTemplate.convertAndSend(ConfirmConfig.exchange_name, ConfirmConfig.routing_key, msg);
    }

消费者

@Component
@Slf4j
public class ConfirmConsumer {

    // 接收指定队列的消息
    @RabbitListener(queues = ConfirmConfig.queue_name)
    public void receiverQD(Message message, Channel channel) throws Exception {

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String date = format.format(new Date());

        log.info("当前时间:{},收到发布确认队列的消息:{}", date, new String(message.getBody()));
    }
}

以上是发布确认的基本结构

生产者确认机制

修改生产者

  /**
     * 发布确认
     */
    @GetMapping("/sendConfirm/{msg}")
    public void sendConfirm(@PathVariable("msg") String msg) {
        /*
        CorrelationData 用于发布确认的回调参数
         */
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(ConfirmConfig.exchange_name, ConfirmConfig.routing_key,
                msg, correlationData);
    }

添加自定义的交换机发布确认回调接口实现类

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/*
发布确认回调
1.发消息,交换机接收到了消息,回调
    参数1 correlationData 保存回调消息的ID及相关信息
    参数2 ack 交换机收到消息 true
    参数3 cause null
2.发消息,交换机接收失,回调
    参数1 correlationData 保存回调消息的ID及相关信息
    参数2 ack false 交换机收到失败
    参数3 cause null 失败的原因

 */
@Component
@Slf4j
public class MyCallBacllk implements RabbitTemplate.ConfirmCallback {

    @Autowired
    RabbitTemplate template;

    /*
     注入
     因为 ConfirmCallback 是 RabbitTemplate 的一个内部接口
     而 MyCallBacllk 作为 ConfirmCallback 的实现类,并不在 RabbitTemplate 内部
     因此 MyCallBacllk 不会起作用

     需要先注入 RabbitTemplate
     在 @PostConstruct 修饰的方法中,通过注入的 RabbitTemplate 实例
     将 MyCallBacllk 实现类自身注入进去即可

     @PostConstruct
     该注解被用来修饰一个非静态的void()方法
     被 @PostConstruct 修饰的方法会在服务器加载 Servlet 的时候运行
     并且只会被服务器执行一次。PostConstruct 在构造函数之后执行,init()方法之前执行
     该注解修饰的方法,会在其他注解执行完成之后执行
     */
    @PostConstruct
    public void intoTemplate() {
        template.setConfirmCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机收到了消息,消息id为:{}", id);
        } else {
            log.info("交换机未收到id为:{} 的消息,原因为:{}", id,cause);
        }
    }
}

以上,在生产者发布消息后,通过自定义的回调接口实现类,会判断消息是否被交换机收到,返回不同结果

但是,此模式只针对交换机,在队列失效的时候,不会返回相关的结果且消息也被丢弃了

回退消息

在生产者确认机制的模式下,未被正确路由的消息(也就是未被正确投放到队列的消息),会被直接丢弃,可以通过设置 Mandatory(译: 强制性的) 参数,在消息路由失败时,将该消息回退给生产者

配置文件添加

#开启消息回退机制
spring.rabbitmq.publisher-returns=true

修改回调接口实现类

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/*
发布确认回调
1.发消息,交换机接收到了消息,回调
    参数1 correlationData 保存回调消息的ID及相关信息
    参数2 ack 交换机收到消息 true
    参数3 cause null
2.发消息,交换机接收失,回调
    参数1 correlationData 保存回调消息的ID及相关信息
    参数2 ack false 交换机收到失败
    参数3 cause null 失败的原因

 */
@Component
@Slf4j
public class MyCallBacllk implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    RabbitTemplate template;

    /*
     注入
     因为 ConfirmCallback 是 RabbitTemplate 的一个内部接口
     而 MyCallBacllk 作为 ConfirmCallback 的实现类,并不在 RabbitTemplate 内部
     因此 MyCallBacllk 不会起作用

     需要先注入 RabbitTemplate
     在 @PostConstruct 修饰的方法中,通过注入的 RabbitTemplate 实例
     将 MyCallBacllk 实现类自身注入进去即可

     @PostConstruct
     该注解被用来修饰一个非静态的void()方法
     被 @PostConstruct 修饰的方法会在服务器加载 Servlet 的时候运行
     并且只会被服务器执行一次。PostConstruct 在构造函数之后执行,init()方法之前执行
     该注解修饰的方法,会在其他注解执行完成之后执行
     */
    @PostConstruct
    public void intoTemplate() {
        template.setConfirmCallback(this);
        template.setReturnCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机收到了消息,消息id为:{}", id);
        } else {
            log.info("交换机未收到id为:{} 的消息,原因为:{}", id, cause);
        }
    }

    /*
    消息回退接口
    在消息传递过程中,只将不可达的消息返回给生产者
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息:{} 被交换机:{}回退了,回退的原因为:{},routingKey为:{}",
                message.getBody(), exchange, replyText, routingKey);
    }
}

通过开启回退机制,实现 RabbitTemplate.ReturnCallback 接口及其方法,就能够实现在消息未被正确路由时,也就是队列出现问题时的一个消息不丢失效果

总结

通过上述的【生产者确认机制】和【消息回退机制】我们就实现了一个:即使在交换机和队列都出现问题的情况下,做到消息不丢失

备份交换机

有了 mandatory 参数和回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何处理这些无法路由的消息,最多打个日志,然后触发报警,再来手动处理。而通过日志来处理这些无法路由的消息是很不优雅的做法,特别是当生产者所在的服务有多台机器的时候,手动复制日志会更加麻烦而且容易出错。而且设置 mandatory 参数会增加生产者的复杂性,需要添加处理这些被退回的消息的逻辑。

如果既不想丢失消息,又不想增加生产者的复杂性,该怎么做呢?

前面在设置死信队列的文章中,我们提到,可以为队列设置死信交换机来存储那些 处理失败 的消息,可是这些 不可路由消息 根本没有机会进入到队列,因此无法使用死信队列来保存消息。在 RabbitMQ 中,有一种备份交换机的机制存在,可以很好的应对这个问题。

什么是备份交换机呢?备份交换机可以理解为 RabbitMQ 中交换机的 备胎,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。当然,我们还可以建立一个报警队列,用独立的消费者来进行监测和报警…

代码实现

代码架构图

配置类修改

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/*
发布确认 | 备份交换机配置类
 */
@Configuration
public class ConfirmConfig {

    public static final String exchange_name = "confirm.exchange";// 交换机名称
    public static final String routing_key = "key1";// routingKey
    public static final String queue_name = "confirm.queue";// 队列名称
    public static final String backup_exchange = "backup.change";// 备份交换机
    public static final String backup_queue = "backup.queue";// 备份队列
    public static final String warning_queue = "warning.queue";// 报警队列

    // 申明交换机
    @Bean("confirmEx")
    public DirectExchange confirmEx() {

        return ExchangeBuilder
                .directExchange(exchange_name) // 交换机名称
                .durable(false)

                // 指定该交换机的备份交换机名称
                .withArgument("alternate-exchange", backup_exchange)
                .build();

    }


    // 备份交换机
    @Bean("backupEx")
    public FanoutExchange fanoutEx() {
        return new FanoutExchange(backup_exchange);
    }

    // 申明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder
                .durable(queue_name)
                .build();
    }


    // 备份队列
    @Bean("backupQueue")
    public Queue backupQueue() {
        return QueueBuilder
                .durable(backup_queue)
                .build();
    }

    // 报警队列
    @Bean("warningQueue")
    public Queue warningQueue() {
        return QueueBuilder
                .durable(warning_queue)
                .build();
    }


    // 绑定
    @Bean
    public Binding queueBind(@Qualifier("confirmEx") DirectExchange exchange
            , @Qualifier("confirmQueue") Queue queue) {

        return BindingBuilder.bind(queue).to(exchange).with(routing_key);
    }


    // 备份队列的绑定
    @Bean
    public Binding backQueueBind(
            @Qualifier("backupEx") FanoutExchange fanoutExchange,
            @Qualifier("backupQueue") Queue backupQueue,
            @Qualifier("warningQueue") Queue warningQueue) {

        return BindingBuilder.bind(backupQueue).to(fanoutExchange);
    }

    // 警告队列的绑定
    @Bean
    public Binding waringQueueBind(
            @Qualifier("backupEx") FanoutExchange fanoutExchange,
            @Qualifier("warningQueue") Queue warningQueue) {

        return BindingBuilder.bind(warningQueue).to(fanoutExchange);
    }
}

消费者

import com.dhj.rabbitmqdemo.config.ConfirmConfig;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Slf4j
@Component
public class BackUpConsumer {

    // 接收指定队列的消息
    @RabbitListener(queues = ConfirmConfig.warning_queue)
    public void receiverQD(Message message, Channel channel) throws Exception {

        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String date = format.format(new Date());

        log.warn("当前时间:{},收到备份交换机,警告队列的消息:{}", date, new String(message.getBody()));
    }
}

因为其余代码使用的是之前发布确认中的,在发布确认中,定义的消息回调接口在消息未正确路由时,没有打印信息,可以看出,备份交换机的优先级要高于 Mandatory

通过备份交换机,将未路由成功的消息,交给备份交换机,备份交换机再将其投递到与其绑定的队列中去,可以确保对未路由成功的消息进行一个处理,因为正常交换机和备份交换机同时路由失败的概率还是极低的

幂等性问题

概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用

举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息

解决思路

MQ 消费者的幂等性的解决一般使用全局ID 或者写个唯一标识比如时间戳或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过

消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发送了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一ID+指纹码机制,利用数据库主键去重 | b.利用 redis 的原子性去实现

唯一ID+指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式

Redis原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费

优先级队列

场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单、淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall 商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级

使用

注意:要让队列实现优先级需要做的事情有如下

队列需要设置为优先级队列
消息需要设置消息的优先级
消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序

优先级越大越先执行,范围 0 ~ 255

代码演示

生产者

import com.dhj.demo.constant.RabbitMQConst;
import com.dhj.demo.utils.RabbitMQUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

/**
 * mq 消息队列 消息生产者
 */
public class Producer {
    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 发送消息
    public static void main(String[] args) {
        try {// 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();

            // 设置工厂 ip ,用于连接 mq 的队列
            factory.setHost("192.168.116.130");

            // 设置用户名 & 密码
            factory.setUsername("admin");
            factory.setPassword("111111");

            // 创建连接
            Connection connection = factory.newConnection();

            // 创建信道
            Channel channel = connection.createChannel();

            channel.basicQos(1);

            Map<String, Object> paramMap = new HashMap<>();

            /*
            在队列中添加优先级范围
            官方允许 0 ~ 255
            自定义设置一个值 x 后,范围就为 0 ~ x
            建议不要设置过大,浪费 cpu 和 内存
            因为会根据优先级的进行排序
            范围越大,排序越浪费资源
             */
            paramMap.put("x-max-priority", 10);
            channel.queueDeclare(QUEUE_NAME, true, false, false, paramMap);

            // 发消息
            String message = "Hello World!";

            for (int i = 0; i < 10; i++) {
                if (i == 5) {
                    // priority(5) 设置消息的优先级为 5
                    AMQP.BasicProperties properties
                            = new AMQP.BasicProperties()
                            .builder().priority(5).build();
                    channel.basicPublish("", QUEUE_NAME, properties,
                            "HelloPriority".getBytes());
                } else {
                    channel.basicPublish("", QUEUE_NAME, null,
                            message.getBytes());
                }
            }
            System.out.println("-----消息发送完毕!-----");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * mq 消息队列 消费者
 */
public class Consumer {

    // 队列名称,表示接收此队列的消息
    public static final String QUEUE_NAME = "hello";

    // 接受消息
    public static void main(String[] args) {

        try {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();

            // 设置连接参数
            factory.setHost("192.168.116.130");
            factory.setUsername("admin");
            factory.setPassword("111111");

            // 获取连接
            Connection connection = factory.newConnection();

            // 创建信道
            Channel channel = connection.createChannel();

            // 声明未成功消费的回调,使用 lambda 表达式的匿名内部类进行声明,重写其中的 handle 方法
            DeliverCallback deliverCallback = (consumerTag, message) -> {

                /*
                 直接打印消息,也就是被消费者消费的消息
                 参数 message 是一个 Delivery(交付) 对象
                 可以使用 String 构造器,将 message 的消息体转化为字符串,也就是具体的消息字符串
                 */
                System.out.println(new String(message.getBody()));
            };

            // 声明消费者取消消费的回调
            CancelCallback cancelCallback = (consumerTag) -> {
                // 取消消息
                System.out.println("消费者消息消费被取消");
            };

            /*
             消费者接收消息
             参数1 消费那个队列的消息(队列名)
             参数2 消费成功之后,是否要自动应答(true)
             参数3 消费者未成功消费的回调
             参数4 消费者取消消费的回调
             */
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

这里为了方便,使用的是之前原始的生产者 | 消费者代码进行修改而来

允许代码,观察控制台,发现,尽管消息内容为 HelloPriority 的消息不是最开始发送的,但是因为其优先级高,因此最先被消费了

惰性队列

使用场景

RabbitMQ从3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候

两种模式

队列具备两种模式:default和lazy。默认的为default模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的

在队列声明的时候可以通过 x-queue-mode 参数来设置队列的模式,取值为 defaultlazy 下面示例中演示了一个惰性队列的声明细节

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue",false,false,false,args);

内存开销对比

在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB

RabbitMQ 集群

略,推荐查看尚硅谷官方笔记

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-11 16:42:24  更:2021-07-11 16:44:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 18:54:47-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码