1. 死信队列
1.1 概念
死信就是无法被消费的消息,消费者从队列中取消息时,由于某些特定原因导致消息无法被消费,即没有了后续的处理,就变成了死信继而有了死信队列。
1.2 应用场景
可以保证消息不会消失,如果消费者在进行消费时发送异常,可以先放到死信队列中,等后面运行环境好了之后再进行消费。
1.3 死信来源
- 消息TTL过期;
- 队列达到最大长度;
- 消息被拒绝。
1.4 死信实战
架构图:
- 正常情况下使用normal交换机绑定给normal队列C1进行消费;
- 出现异常情况就转发给 dead交换机绑定给dead队列让C2进行消费。
1.4.1 消息TTL过期
测试过程:
- 开启消费者1构建交换机队列RoutingKey之间的关系;
- 然后关闭消费者1;
- 开启生产者发送消息,等待十秒之后消息过期就会到达死信队列;
- 然后启动消费者2接收死信队列里面的消息。
消费者1:
public class Consumer1 {
private static final String NORMAL_QUEUE = "normal_queue";
private static final String DEAD_QUEUE = "dead_queue";
private static final String NORMAL_EXCHANGE = "e_ch";
private static final String DEAD_EXCHANGE = "d_ch";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
Map<String ,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false
,arguments);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("C1等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag ->{});
}
}
消费者2:
public class Consumer2 {
private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
System.out.println("C2等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) ->{
System.out.println("C2接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
};
channel.basicConsume(DEAD_QUEUE,true, deliverCallback, consumerTag ->{});
}
}
生产者:
public class Producer {
private static final String NORMAL_EXCHANGE = "e_ch";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();
for (int i = 0; i < 11; i++) {
String msg = "message" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功:" + msg);
}
}
}
1.4.2 队列达到最大长度
修改消费者1,指定最多长度为6条消息,超过这6条消息之后的消息会进入到死信中。 消费者1给村参数的map新增一条:
arguments.put("x-max-length",6);
删除生产者中对队列消息事件的限制。 如果出现406错误就是已经存在队列,可以删除原有队列或者修改一个新名字。
1.4.3 消息被拒
修改消费者1,拒绝最后一位为双数的消息。
消费者1:
public class Consumer1 {
private static final String NORMAL_QUEUE = "normal_queue1";
private static final String DEAD_QUEUE = "dead_queue1";
private static final String NORMAL_EXCHANGE = "e_ch1";
private static final String DEAD_EXCHANGE = "d_ch1";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
Map<String ,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false
,arguments);
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("C1等待接收消息...");
DeliverCallback deliverCallback = (consumerTag, message) ->{
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
if (Integer.parseInt(msg.substring(msg.length()-1)) % 2 == 0){
System.out.println("C1拒绝的消息:" + msg +"----------===========-----------");
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else{
System.out.println("C1接收到:" + new String(message.getBody(), StandardCharsets.UTF_8));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
};
channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});
}
}
先启动消费者1创建对应关系,随之启动消费者2,再启动生产者。
生产者发送: 消费者1: 消费者2:
|