含义
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信队列。
出现场景
“死信”是 RabbitMQ 中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时 requeue 属性被设置为 false。
- 消息在队列的存活时间超过设置的 TTL(消息过期)时间。
- 消息队列的消息数量已经超过最大队列长度。
案例一:消息过期
架构
组件配置
@Configuration
public class RabbitDeadLetterConfig {
public static final String DEAD_QUEUE_NAME = "dead_queue";
public static final String DEAD_EXCHANGE_NAME = "dead_exchange";
public static final String NORMAL_QUEUE_NAME = "normal_queue";
public static final String NORMAL_EXCHANGE_NAME = "normal_exchange";
public static final String DEAD_ROUTING_KEY = "dead_routing_key";
public static final String NORMAL_ROUTING_KEY = "normal_routing_key";
@Bean
public Queue normalQueue() {
Map<String, Object> param = new HashMap<>();
param.put("x-dead--letter-exchange", DEAD_EXCHANGE_NAME);
param.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return new Queue(NORMAL_QUEUE_NAME, true, false,false, param);
}
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE_NAME);
}
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE_NAME);
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE_NAME);
}
@Bean
public Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);
}
@Bean
public Binding deadBinding(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY);
}
}
生产者
@Component
@Slf4j
public class DeadLetterSender {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
String msg1 = "dead msg which ttl is 10s";
String msg2 = "dead msg which ttl is 1000s";
rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
RabbitDeadLetterConfig.NORMAL_ROUTING_KEY, msg1, (message) -> setExpireTime(message, "10000"));
rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
RabbitDeadLetterConfig.NORMAL_ROUTING_KEY, msg2, (message) -> setExpireTime(message, "1000000"));
}
private Message setExpireTime(Message message, String expireTime) {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setExpiration(expireTime);
return message;
}
}
消费者
正常消费者
@Component
@RabbitListener(queues = RabbitDeadLetterConfig.NORMAL_QUEUE_NAME)
@Slf4j
public class NormalConsumer {
@RabbitHandler
public void consume(String msg) {
log.info("normal consumer received msg:" + msg);
}
}
死信消费者
@Component
@RabbitListener(queues = RabbitDeadLetterConfig.DEAD_QUEUE_NAME)
@Slf4j
public class DeadConsumer {
@RabbitHandler
public void consume(String msg) {
log.info("dead consumer received msg:" + msg);
}
}
测试
@GetMapping("/dead")
public void deadLetter() {
deadLetterSender.send();
}
首先将正常消费者注释掉,启动程序,调用接口触发生产者生产消息。其中一条消息因有过期时间,当它在10s内不能消费时便会过期,此时看死信消费者能否收到消息 死信消费者成功接收到消息 取消掉正常消费者的注释启动程序,此时另外一条消息被正常消息者消费。
案例二:队列长度已满
组件配置
重新定义一个队列,将此队列绑定到案例一的交换机,其他组件仍使用案例一的配置
@Bean
public Queue sizeQueue() {
Map<String, Object> param = new HashMap<>();
param.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
param.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
param.put("x-max-length", 5);
return new Queue(SIZE_DEAD_QUEUE_NAME, true, false,false, param);
}
@Bean
public Binding sizeDeadBinding(Queue sizeQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(sizeQueue).to(normalExchange).with(SIZE_ROUTING_KEY);
}
生产者
@Component
@Slf4j
public class SizeDeadLetterSender {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 1; i < 11; i++) {
String msg = "size dead msg" + i;
rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
RabbitDeadLetterConfig.SIZE_ROUTING_KEY, msg);
}
}
}
消费者
重新定义的队列不进行消费,好使队列长度塞满。
测试
死信队列消费者能收到5条消息,证明这5条是队列塞满后进入死信队列的 剩余的5条仍在新定义的队列中
案例三:消息被拒绝
组件配置
同样新增一个队列绑定到案例一的交换机上,其他组件配置不变
@Bean
public Queue refusedQueue() {
Map<String, Object> param = new HashMap<>();
param.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
param.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
return new Queue(REFUSED_QUEUE_NAME, true, false,false, param);
}
@Bean
public Binding refusedBinding(Queue refusedQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(refusedQueue).to(normalExchange).with(REFUSE_ROUTING_KEY);
}
生产者
发送10条消息到信新队列
@Component
@Slf4j
public class RefusedDeadLetterSender {
@Resource
private RabbitTemplate rabbitTemplate;
public void send() {
for (int i = 1; i < 11; i++) {
String msg = "refuseMsg" + i;
rabbitTemplate.convertAndSend(RabbitDeadLetterConfig.NORMAL_EXCHANGE_NAME,
RabbitDeadLetterConfig.REFUSE_ROUTING_KEY, msg);
}
}
}
消费者
如果是指定消息拒绝,其他消息正常消费
@Component
@Slf4j
public class RefuseMsgConsumer {
@RabbitListener(queues = RabbitDeadLetterConfig.REFUSED_QUEUE_NAME)
public void consume(Message msg, Channel channel) throws IOException {
String msgStr = new String(msg.getBody());
if ("refuseMsg10".equals(msgStr)) {
channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
log.info("{} refused by refuse consumer", msgStr);
} else {
channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
log.info("{} consume by refuse consumer", msgStr);
}
}
}
特别说明
消费者默认是自动确认的,这里很明显要开启消费者手动确认模式。 在配置文件中添加如下代码:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
测试
指定消息被拒绝进入了死信队列,其余消息被正常消费
|