死信队列的概念:
用来存放变成死信的消息的队列
死信产生的途径:
- 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
- 消息在队列的存活时间超过设置的TTL时间
- 消息队列的消息数量已经超过最大队列长度
如何配置死信队列
- 配置死信队列,配置死信交换机,将死信队列绑定到死信交换机上,指定路由key
- 配置业务队列(同时绑定对应的死信交换机和路由key),配置业务交换机,再将业务队列绑定到业务交换机
在需要使用死信的业务队列中配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。 有了死信交换机和路由key后,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。 也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,然后死信的产生途径不是客户端发送的,而是由上面说的几种方式产生的。
用法
首先消息消费端改成 消息手动确认
# 开启手动确认消息,这样订阅者必须要手动 ack
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
消息发送端:
String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead_letter_key";
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue("deadLetterQueueA");
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange("deadLetterExchange");
}
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange("businessExchange");
}
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "deadLetterExchange");
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable("businessQueueA").withArguments(args).build();
}
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
消息接收端(消费端)
@RabbitListener(queues = "businessQueueA")
public void receivebusinessQueueA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
if(msg.equals("1")){
System.out.println("收到业务消息:" + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}else{
System.out.println("消息消费异常,放进死信队列");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = "deadLetterQueueA")
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
发送消息的接口
@GetMapping("/send6")
@ResponseBody
public String send6(String index){
template.convertAndSend("businessExchange","",index);
return "发送消息到队列成功";
}
|