RabbitMQ学习总结(中)——发布确认、交换机和死信队列
一、发布确认
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他可以是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
开启发布确认的方法:
发布确认默认是没有开启的,如果要在发布者中开启需要调用方法confirmSelect ,每当你要想使用发布确认,都需要在 channel 上调用该方法
channel.confirmSelect();
1.1 单个确认发布
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
public class ConfirmMessage {
public static int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
publishMessageSingle();
}
public static void publishMessageSingle() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
boolean flag = channel.waitForConfirms();
if(flag) {
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end-begin) + "秒");
}
}
执行结果: 
1.2 批量确认发布
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
代码示例:
public class ConfirmMessage {
public static int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
publishMessageBatch();
}
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
int batchSize = 100;
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
if(i % batchSize == 0){
boolean flag = channel.waitForConfirms();
if(flag) {
System.out.println("消息发送成功");
}
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end-begin) + "秒");
}
}
执行结果: 
1.3 异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。  如何处理异步未确认消息?
最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
public class ConfirmMessage {
public static int MESSAGE_COUNT = 1000;
public static void main(String[] args) throws Exception {
publishMessageAsync();
}
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
channel.confirmSelect();
long begin = System.currentTimeMillis();
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if(multiple) {
ConcurrentNavigableMap<Long, String> confirmd =
outstandingConfirms.headMap(deliveryTag);
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认的消息:" + deliveryTag);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
System.out.println("未确认的消息是:" + message + "未确认的消息tag:" + deliveryTag);
};
channel.addConfirmListener(ackCallback,nackCallback);
for (int i = 1; i <= MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end-begin) + "秒");
}
}
执行结果: 
总结:
单独发布消息
批量发布消息
- 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。
异步处理:
- 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
二、交换机
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
发布消息方法:  第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
临时队列:
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
创建出来之后长成这样: 
绑定 binding:
binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。 
2.1 Fanout exchange(发布/订阅模式)
Fanout exchange又叫发布订阅模式。扇出交换机将消息路由到与其绑定的所有队列 ,并且路由键将被忽略 。如果将N个队列绑定到扇出交换,则将新消息发布到该交换时,会将消息的副本传递到所有N个队列。扇出交换机非常适合消息的广播路由 注意:fanout类型的exchange会把消息推到所有的queue中,所以不需要指定routingkey,指定了也没用
系统中默认有fanout类型的exchange 
实现效果:EmitLog(生产者)发送消息给两个消费者接收并打印接收到的信息  代码示例:
EmitLog 发送消息给两个消费者接收:
public class EmitLog {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生成这发出消息:" + message);
}
}
}
ReceiveLogs02将接收到的消息打印
public class ReceiveLogs01 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("ReceiveLogs01等待接收消息,把接收到的消息打印在屏幕上.....");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs02控制台打印接收到的消息:" + new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,(consumerTag) -> {});
}
}
ReceiveLogs02将接收到的消息打印在控制台
public class ReceiveLogs02 {
public static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("ReceiveLogs02等待接收消息,把接收到的消息打印在屏幕上.....");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("ReceiveLogs02控制台打印接收到的消息:" + new String(message.getBody()));
};
channel.basicConsume(queueName,true,deliverCallback,(consumerTag) -> {});
}
}
效果展示: 
2.2 Direct exchange(路由模式)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey (路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey 。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key 进行判断,只有队列的Routingkey 与消息的 Routingkey 完全一致,才会接收到消息
 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定:
 当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。
实战实现效果:
 c2:绑定disk,routingKey为error c1:绑定console,routingKey为info、warning
-
生产者: public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info", "普通 info 信息");
bindingKeyMap.put("warning", "警告 warning 信息");
bindingKeyMap.put("error", "错误 error 信息");
bindingKeyMap.put("debug", "调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
-
消费者C1: public class ReceiveLogsDirect01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console",false,false,false,null);
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
System.out.println("ReceiveLogsDirect01等待接收消息,把接收到的消息打印在屏幕上.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
System.out.println("info和warning 消息已经接收:\n" + message);
};
channel.basicConsume("console",true,deliverCallback,(consumerTag) -> {});
}
}
-
消费者C2: public class ReceiveLogsDirect02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk",false,false,false,null);
channel.queueBind("disk",EXCHANGE_NAME,"error");
System.out.println("ReceiveLogsDirect02等待接收消息,把接收到的消息打印在屏幕上.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
System.out.println("error 消息已经接收:\n" + message);
};
channel.basicConsume("disk",true,deliverCallback,(consumerTag) -> {});
}
}
-
执行结果: 
2.3 Topics 模式
Topic 类型的Exchange 与Direct 相比,都是可以根据RoutingKey 把消息路由到不同的队列。只不过Topic 类型Exchange 可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
Topic的要求:
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的:
- *(星号)可以代替一个单词
- #(井号)可以替代零个或多个单词
Topic匹配案例:
下图绑定关系如下: 
- Q1–>绑定的是
- 中间带 orange 带 3 个单词的字符串
(*.orange.*) - Q2–>绑定的是
- 最后一个单词是 rabbit 的 3 个单词
(*.*.rabbit) - 第一个单词是 lazy 的多个单词
(lazy.#)
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
例子 | 说明 |
---|
quick.orange.rabbit | 被队列 Q1Q2 接收到 | azy.orange.elephant | 被队列 Q1Q2 接收到 | quick.orange.fox | 被队列 Q1 接收到 | lazy.brown.fox | 被队列 Q2 接收到 | lazy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 | quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 | quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 | lazy.orange.male.rabbit | 是四个单词但匹配 Q2 |
注意:
- 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
- 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
实战实现效果:实现上面的匹配案例

-
生产者 public class EmitLogTopic {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> stringEntry : bindingKeyMap.entrySet()) {
String routingKey = stringEntry.getKey();
String message = stringEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息:" + message);
}
}
}
-
消费者C1
public class ReceiveLogsTopic01 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("Q1等待接收消息,把接收到的消息打印在屏幕上.....");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Q1控制台打印接收到的消息:" + new String(message.getBody()));
System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,(consumerTag) ->{});
}
}
-
消费者C2
public class ReceiveLogsTopic02 {
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("Q2等待接收消息,把接收到的消息打印在屏幕上.....");
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("Q2控制台打印接收到的消息:" + new String(message.getBody()));
System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
};
channel.basicConsume(queueName,true,deliverCallback,(consumerTag) ->{});
}
}
-
执行结果: 
三、死信队列
死信队列简介:
-
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 -
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源:
-
消息 TTL 过期,TTL是Time To Live的缩写, 也就是生存时间 -
队列达到最大长度,队列满了,无法再添加数据到 mq 中 -
消息被拒绝,(basic.reject 或 basic.nack) 并且 requeue=false(手动确认)
死信实战效果: 
3.1 死信之TTl
-
消费者C1,用于消费正常队列里的消息 public class Consumer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息" + message);
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,(consumerTag) ->{});
}
}
-
消费者C2,用于消费死信队列里的消息 public class Consumer02 {
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
System.out.println("等待接收消息........... ");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收到消息" + message);
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,(consumerTag) ->{});
}
}
-
生产者 public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
}
}
-
运行测试:启动 C1 ,之后关闭消费者,模拟其接收不到消息。再启动生产者
 以上步骤完成后,启动 C2 消费者,它消费死信队列里面的消息 
3.2 死信之最大长度
-
消息生产者代码去掉TTL属性  -
C1消费者中设置队列的最大的长度,C2代码不变 
params.put("x-max-length",6);
-
运行测试 C1启动之后关闭该消费者 模拟其接收不到消息,之后启动C2消费者 
3.3 死信之消息被拒
-
生产者和C2消费者的代码不变,在C1消费中设置拒收消息 "info5"  -
模拟运行
启动生产者  启动消费者C1然后再启动消费者C2 
|