各个MQ介绍
ActiveMQ

kafak

RocketMQ

RabbitMQ

MQ的选择

RabbitMQ
四大核心概念
 

原理名词解释
  
Hello Word
环境准备
maven环境
<!--指定 jdk 编译版本--> <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
生产者
package com.wh;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class HelloWordDemo {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello word";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成");
}
}
  
消费者
package com.wh.helloword;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback=(customerTag,message)->{
System.out.println("未成功消费的回调==>"+new String(message.getBody()));
};
CancelCallback cancelCallback=(customerTag)->{
System.out.println("取消消费的回调==>消息消费中断");
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}

Work Queues(工作模式)
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。 
轮训分发消息
工具类
package com.wh.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
消费者
package com.wh.workQueues;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class Work01 {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback=(customerTag, message)->{
System.out.println("未成功消费的回调==>"+new String(message.getBody()));
};
CancelCallback cancelCallback=(customerTag)->{
System.out.println("取消消费的回调==>消息消费中断");
};
System.out.println("C1等待接收消息");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
生产者
package com.wh.workQueues;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task01 {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner scanner=new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,next.getBytes());
System.out.println("发送消息完成=>"+next);
}
}
}
结果

 
消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡 ,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制 , 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用 。
消息应答方法
A.Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack(用于否定确认)
C.Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了
Multiple 的解释
建议用false:不批量应答 手动应答的好处是可以批量应答并且减少网络拥堵  multiple 的 true 和 false 代表不同意思
true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
 
消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
消息手动应答代码
生产者
package com.wh.Ack;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task02 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String next = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, next.getBytes("UTF-8"));
System.out.println("生产者发出消息");
}
}
}
消费者
package com.wh.Ack;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import com.wh.utils.SleepUtils;
public class Work03 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C1等待接收消息");
DeliverCallback deliverCallback=(customerTag,message)->{
SleepUtils.sleep(1);
System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(customerTag)->{
System.out.println("取消消费的回调==>消息消费中断");
});
}
}
package com.wh.Ack;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import com.wh.utils.SleepUtils;
public class Work04 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("C2等待接收消息");
DeliverCallback deliverCallback = (customerTag, message) -> {
SleepUtils.sleep(30);
System.out.println("接收到消息:" + new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (customerTag) -> {
System.out.println("取消消费的回调==>消息消费中断");
});
}
}
手动应答效果演示
 在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了   
持久化
刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消 息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列 和消息 都标记为持久化。
队列的持久化
 
消息的持久化

不公平分发(消费者端)
 为了避免这种情况,我们可以设置参数 channel.basicQos(1);  
代码及演示
  

预取值
    
publish/Subscribe(发布确认模式)

生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始) ,一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法 
单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式 ,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢 ,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
package com.wh.confirm;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.UUID;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
ConfirmMessage.publishMessageIndividually();
}
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queue = UUID.randomUUID().toString();
channel.queueDeclare(queue, true, false, false, null);
channel.confirmSelect();
long start = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queue, null, message.getBytes());
boolean confimOK = channel.waitForConfirms();
if (confimOK) {
System.out.println("发布成功");
}
}
long end = System.currentTimeMillis();
System.out.println("====发布"+MESSAGE_COUNT+"条消息,共耗时"+(end-start)+"毫秒");
}
}

批量确认发布
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
package com.wh.confirm;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.UUID;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
ConfirmMessage.publishMessageBatch();
}
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queue = UUID.randomUUID().toString();
channel.queueDeclare(queue, false, false, false, null);
channel.confirmSelect();
long start = System.currentTimeMillis();
int messageBatch=100;
int outStandingMessageCount= 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queue, null, message.getBytes());
outStandingMessageCount++;
if(messageBatch == outStandingMessageCount){
channel.waitForConfirms();
outStandingMessageCount=0;
}
}
if(outStandingMessageCount>0){
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("====发布"+MESSAGE_COUNT+"条消息,共耗时"+(end-start)+"毫秒");
}
}

异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的 ,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。 
package com.wh.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.wh.utils.RabbitMqUtils;
import java.util.UUID;
public class ConfirmMessage {
public static final int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
ConfirmMessage.publishMessageAsync();
}
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queue = UUID.randomUUID().toString();
channel.queueDeclare(queue, false, false, false, null);
channel.confirmSelect();
long start = System.currentTimeMillis();
ConfirmCallback ackConfirmCallback=(deliveryTag,multiple)->{
};
ConfirmCallback nackConfirmCallback=(deliveryTag,multiple)->{
System.out.println("未确认的消息:"+deliveryTag);
};
channel.addConfirmListener(ackConfirmCallback,nackConfirmCallback);
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queue, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布"+MESSAGE_COUNT+"条消息,共耗时"+(end-start)+"毫秒");
}
}

如何处理异步未确认消息
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列 ,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

  
交换机
在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者 。这种模式称为 ”发布/订阅”
Exchanges
Exchanges 概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange) ,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定
Exchanges 的类型
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
无名 exchange
在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。  第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
临时队列
之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列 ,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除 。
创建临时队列的方式如下: String queueName = channel.queueDeclare().getQueue();
绑定
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定 
Fanout(扇出)
Fanout 介绍
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些 exchange 类型 
Fanout 实战
消费者
package com.wh.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把收到的消息打印到屏幕上");
DeliverCallback deliverCallback=(customerTag,delivery)->{
System.out.println("ReceiveLogs01控制台打印接受到的消息:"+new String(delivery.getBody(), "UTF-8"));
};
channel.basicConsume(queue,true,deliverCallback,(customerTag)->{});
}
}
package com.wh.exchange;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"");
System.out.println("ReceiveLogs02等待接收消息,把收到的消息打印到屏幕上");
DeliverCallback deliverCallback=(customerTag,delivery)->{
System.out.println("ReceiveLogs02控制台打印接受到的消息:"+new String(delivery.getBody(), "UTF-8"));
};
channel.basicConsume(queue,true,deliverCallback,(customerTag)->{});
}
}
生产者
package com.wh.exchange;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.Scanner;
public class EmitLog {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String next = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes());
System.out.println("生产者发出消息:"+next);
}
}
}
结果
  
Direct(直接)
回顾
在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:队列只对它绑定的交换机的消息感兴趣。 绑定用参数:routingKey 来表示也可称该参数为 binding key,创建绑定我们用码:channel.queueBind(queueName,EXCHANGE_NAME, “routingKey”);绑定之后的意义由其交换类型决定。

消费者
package com.wh.exchange.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console", false, false, false, null);
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "wraning");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
System.out.println("ReceiveLogsDirect01控制台接收消息:" + new String(delivery.getBody(), "UTF-8"));
};
channel.basicConsume("console", true, deliverCallback, (customerTag) -> {
});
}
}
package com.wh.exchange.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk", false, false, false, null);
channel.queueBind("disk", EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
System.out.println("ReceiveLogsDirect02控制台接收消息:" + new String(delivery.getBody(), "UTF-8"));
};
channel.basicConsume("disk", true, deliverCallback, (customerTag) -> {
});
}
}
生产者
package com.wh.exchange.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.Scanner;
public class DirectLog {
private static final String EXCHANGE_NAME="direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String next = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"info",null,next.getBytes());
System.out.println("生产者发出消息:"+next);
}
}
}
测试结果
routind key==info
  
routind key==warning
  
routind key==error
  
Topics(主题)
之前类型的问题
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机 ,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型。
Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。 这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词 #(井号)可以替代零个或多个单词
Topic 匹配案例
下图绑定关系如下
Q1–>绑定的是 --------中间带 orange 带 3 个单词的字符串(.orange.)
Q2–>绑定的是 --------最后一个单词是 rabbit 的 3 个单词(..rabbit) --------第一个单词是 lazy 的多个单词(lazy.#)
   当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
实战
消费者
package com.wh.exchange.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :"+QUEUE_NAME+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, customerTag -> {
});
}
}
package com.wh.exchange.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q2";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接收队列 :"+QUEUE_NAME+" 绑 定 键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, customerTag -> {
});
}
}
生产者
package com.wh.exchange.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
HashMap<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit"," 被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant"," 被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox"," 被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox"," 被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit"," 虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox"," 不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit"," 是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit"," 是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:"+message);
}
}
}
结果
   
死信队列
死信的概念

死信来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
死信实战
代码架构图

死信实战–过期时间
消费者(普通消费者)
package com.wh.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import java.util.HashMap;
public class Customer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
HashMap<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("Consumer01等待接收消息..........");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (customerTag) -> {
});
}
}
生产者
package com.wh.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message="info:"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:"+message);
}
}
}
结果
先启动普通消费者,让交换机,队列被创建并进行绑定。    
再关闭普通消费者后,再启动生产者发消息,模拟消息超时进入私信队列。 

消费者(死信消费者)
package com.wh.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import java.util.HashMap;
public class Customer02 {
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("死信消费者Consumer02等待接收消息..........");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("死信消费者Consumer02 接收到消息" + message);
};
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, (customerTag) -> {
});
}
}
结果

死信实战–队列达到最大长度
生产者
注释掉过期时间即可 
消费者(普通消费者)
限制队列的长度 
结果
启动普通消费者  删除普通队列后,再启动普通消费者,注册上普通队列 
停掉普通消费者后,生产者发送10条消息,为了不让普通消费者消费消息,让消息能够积压  
死信实战–消息被拒
消费者(普通消费者)
package com.wh.dead;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.wh.utils.RabbitMqUtils;
import java.util.HashMap;
public class Customer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
HashMap<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
System.out.println("Consumer01等待接收消息..........");
DeliverCallback deliverCallback = (customerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if(message.equals("info:4")){
System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (customerTag) -> {
});
}
}
生产者
package com.wh.dead;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.wh.utils.RabbitMqUtils;
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
for (int i = 0; i < 10; i++) {
String message="info:"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:"+message);
}
}
}
结果
  
延迟队列
延迟队列概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列使用场景
  
RabbitMQ 中的 TTL
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。
通过队列属性设置,队列中所有消息都具有相同的过期时间
对消息本身进行单独设置,只该条消息具有过期时间
如果同时设置,则消息的TTL以较小的值为准。
消息在队列中的生存时间一旦超过设置的TTL值,就会变成死信(Dead Message。
消息设置 TTL(最好生产者-灵活)

队列设置 TTL
第一种是在创建队列的时候设置队列的“x-message-ttl”属性 
二者区别

rabbit整合Springboot
pom
<dependencies>
<!--RabbitMQ 依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>
<!--RabbitMQ 测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
swagger
package com.wh.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("enjoy6288", "http://atguigu.com",
"1551388580@qq.com"))
.build();
}
}
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
server:
port: 8899
队列 TTL
代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下: 
配置文件类代码
package com.wh.ttlQueue;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
private static final String X_EXCHANGE = "X";
private static final String Y_DEAD_LETTER_EXCHANGE = "Y";
private static final String QUEUE_A = "QA";
private static final String QUEUE_B = "QB";
private static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments=new HashMap<>(4);
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key","YD");
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments=new HashMap<>(4);
arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key","YD");
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean
public Binding queueBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange")DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
消息生产者代码
package com.wh.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/seandMsg/{message}")
public void sendMsg(@PathVariable("message") String message) {
log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列"+message);
}
}
消息消费者代码
package com.wh.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
结果
  第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列 ,这里只有 10S 和 40S两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
延时队列优化
代码架构图
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间 
配置文件类代码
  
消息生产者代码
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) {
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), ttlTime, message);
rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
结果
 
 看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为RabbitMQ 只会检查第一个消息是否过期 ,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
Rabbitmq 插件实现延迟队列
上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题
安装延时队列插件
 执行命令让插件生效   重启RabbitMQ后 
情况的不同
延时队列–基于队列的延迟

延时队列–基于交换机的延迟

代码架构图

配置文件类代码
package com.wh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedExchangeConfig {
private static final String DELAYED_QUEUE_NAME = "delayed_queue";
private static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
private static final String DELAYED_ROUTING_KEY = "delayed_routingKey";
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,arguments);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
消息生产者代码
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable("message") String message, @PathVariable("delayTime") Integer delayTime) {
log.info("当前时间:{},发送一条时长{}毫秒 TTL 信息给队列 C:{}", new Date(), delayTime, message);
rabbitTemplate.convertAndSend("delayed_exchange", "delayed_routingKey", message, msg -> {
msg.getMessageProperties().setDelay(delayTime);
return msg;
});
}
消息消费者代码
package com.wh.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class DeadLetterQueueConsumer {
private static final String DELAYED_QUEUE_NAME="delayed_queue";
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message,Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟交换机的消息:{}",new Date().toString(),msg);
}
}
结果
发送请求: http://localhost:8899/ttl/sendDelayMsg/延迟消息-20000/20000 http://localhost:8899/ttl/sendDelayMsg/延迟消息-2000/2000

总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
发布确认高级
在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:
发布确认 springboot 版本
确认机制方案
 交换机和队列都有可能出错导致接受不到消息 1:交换机出错 (交换机收不到消息) 2:队列出错(交换机收到消息,发给队列出问题) 3:交换机和队列都出错了(也归到交换机收不到消息)
代码架构图

配置文件
在配置文件当中需要添加 spring.rabbitmq.publisher-confirm-type=correlated   
添加配置类
package com.wh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME="confirm.exchange";
public static final String CONFIRM_QUEUE_NAME="confirm.queue";
public static final String CONFIRM_ROUTING_KEY="key1";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue,@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
消息生产者
package com.wh.controller;
import com.wh.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class SendConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
CorrelationData correlationData2 = new CorrelationData();
correlationData2.setId("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"111",
ConfirmConfig.CONFIRM_ROUTING_KEY, message+"模拟交换机出错", correlationData2);
CorrelationData correlationData3 = new CorrelationData();
correlationData3.setId("3");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"123", message+"模拟队列出错", correlationData3);
}
}
回调接口
package com.wh.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId() != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
}
消息消费者
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMsg(Message message) {
String msg = new String(message.getBody());
log.info("接受到队列 confirm.queue 消息:{}", msg);
}
结果演示

 第三条消息因为routingkey不对,所以消息被直接丢弃了 。
回退消息
Mandatory 参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的 。那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,我好自己处理啊。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
消息生产者代码
package com.wh.controller;
import com.wh.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/confirm")
@Slf4j
public class SendConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
log.info("生产者发送消息:{},id为:{}",message,"1");
CorrelationData correlationData2 = new CorrelationData();
correlationData2.setId("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"111",
ConfirmConfig.CONFIRM_ROUTING_KEY, message+"模拟交换机出错", correlationData2);
log.info("生产者发送消息:{},id为:{}",message,"2");
CorrelationData correlationData3 = new CorrelationData();
correlationData3.setId("3");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"123", message+"模拟队列出错", correlationData3);
log.info("生产者发送消息:{},id为:{}",message,"3");
}
}
回调接口
package com.wh.callback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId() != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String
exchange, String routingKey) {
log.error(" 消 息 {}, 机 被 交 换 机 {} 因 退 回 , 退 回 原 因 :{}, 路 由 key:{}",new String(message.getBody()),exchange,replyText,routingKey);
}
}
 
结果
 
备份交换机

代码架构图

修改配置类
package com.wh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME).build();
}
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
@Bean("backupExchange")
public FanoutExchange backupExchange() {
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("backupQueue")
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("warningQueue")
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean
public Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(backupQueue).to(backupExchange);
}
@Bean
public Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) {
return BindingBuilder.bind(warningQueue).to(backupExchange);
}
}
  
消费者
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message) {
String msg = new String(message.getBody());
log.info("报警发现不可路由消息:{}", msg);
}
生产者
@GetMapping("/sendBackMessage/{message}")
public void sendBackMessage(@PathVariable("message") String message) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId("1");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY, message, correlationData);
log.info("生产者发送消息:{},id为:{}",message,"1");
CorrelationData correlationData2 = new CorrelationData();
correlationData2.setId("2");
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY+"123", message+"模拟队列出错", correlationData2);
log.info("生产者发送消息:{},id为:{}",message,"2");
}
结果

 mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。
RabbitMQ 其他知识点
幂等性
概念

消息重复消费

解决思路

消费端的幂等性保障

唯一 ID+指纹码机制

Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
优先级队列
使用场景
 
如何添加
 d.注意事项 要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
实战
配置类
package com.wh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class PriorityConfig {
public static final String PRIORITY_EXCHANGE_NAME = " priority.exchange";
public static final String PRIORITY_QUEUE_NAME = " priority.queue";
public static final String PRIORITY_ROUTING_KEY = "priority";
@Bean("priorityExchange")
public DirectExchange priorityExchange(){
return ExchangeBuilder.directExchange(PRIORITY_EXCHANGE_NAME).build();
}
@Bean("priorityQueue")
public Queue priorityQueue(){
return QueueBuilder.durable(PRIORITY_QUEUE_NAME).maxPriority(10).build();
}
@Bean
public Binding priorityBinding(@Qualifier("priorityQueue")Queue priorityQueue,@Qualifier("priorityExchange")DirectExchange priorityExchange){
return BindingBuilder.bind(priorityQueue).to(priorityExchange).with(PRIORITY_ROUTING_KEY);
}
}
生产者
package com.wh.controller;
import com.wh.config.PriorityConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/priority")
public class SendPriorityController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg")
public void sendMsg() {
for (int i = 1; i < 11; i++) {
int finalI = i;
String message = "info" + i;
CorrelationData correlationData = new CorrelationData();
correlationData.setId(i + "");
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(finalI);
messageProperties.setDelay(10000);
rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE_NAME,
PriorityConfig.PRIORITY_ROUTING_KEY, MessageBuilder.withBody(message.getBytes()).andProperties(messageProperties).build(), correlationData);
}
}
}
先不启动消费者-发送消息
 消息成功到达Exchange,以为开启了ConfirmCallback  队列也有了10条 
消费者
@RabbitListener(queues = PriorityConfig.PRIORITY_QUEUE_NAME)
public void receivePriorityQueue(Message message, Channel channel ) throws IOException, InterruptedException {
String msg = new String(message.getBody());
if(msg.equals("info5")){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
log.info("队列优先级消费者拒绝接收:{}",msg);
}else{
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info("消费者接收到-有队列优先级的消息:{}", msg);
}
}
启动消费者-接收消息
 接收的消息按顺序接收
惰性队列
使用场景

两种模式

内存开销对比

springboot整合RabbitMQ发送Json
配置
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQJsonConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
对象
package com.wh.controller;
public class Type {
private String type;
private String value;
public Type() {
}
public Type(String type, String value) {
this.type = type;
this.value = value;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Type{" +
"type='" + type + '\'' +
", value='" + value + '\'' +
'}';
}
}
消费者
@RabbitListener(queues = PriorityConfig.PRIORITY_QUEUE_NAME)
public void receivePriorityQueue(Type type, Channel channel ) throws IOException, InterruptedException {
System.out.println("json接收:"+type);
}
简单发送
生产者
@GetMapping("/sendMsg")
public void sendMsg() {
Type type = new Type("info", "11");
String s = JSON.toJSONString(type);
log.info("s{},{}", 11, s);
rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE_NAME,
PriorityConfig.PRIORITY_ROUTING_KEY, type);
}
结果
 
复杂发送
生产者
@GetMapping("/sendMsg")
public void sendMsg() {
for (int i = 1; i < 11; i++) {
Map<String, String> mqMsg = new HashMap<>();
int finalI = i;
mqMsg.put("type", "info");
mqMsg.put("value", i + "");
CorrelationData correlationData = new CorrelationData();
correlationData.setId(i + "");
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(finalI);
messageProperties.setDelay(10000);
String s = JSON.toJSONString(mqMsg);
log.info("s{},{}", i, s);
rabbitTemplate.convertAndSend(PriorityConfig.PRIORITY_EXCHANGE_NAME,
PriorityConfig.PRIORITY_ROUTING_KEY, MessageBuilder.withBody(s.getBytes())
.andProperties(messageProperties).build(), correlationData);
}
}
结果
Content-Type:application/octet-stream,从字面意思得知,只可以上传二进制数据  
友情参考
感谢B站的尚硅谷: https://www.bilibili.com/video/BV1cb4y1o7zz?p=84
|