Rabbitmq死信队列
消息进入死信队列的条件 1.消息到达超时时间 2.队列长度到达限制 3.消息拒绝签收,不把消息放入原队列中
direct方式
消息超时测试
1.RabbitMQ配置类,队列消息过期时间设置5秒
@Configuration
public class DirectRabbitConfig {
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return new Queue("TDL_QUEUE");
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
}
@Bean
public Queue directQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 5000);
map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
map.put("x-dead-letter-routing-key", "TDL_KEY");
return new Queue("directQueue", true, false, false, map);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
}
}
2.发送消息
@Controller
public class SendController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("sendMessage")
@ResponseBody
public String sendMessage(String message){
rabbitTemplate.convertAndSend("directExchange", "routingkey001", message);
return "ok";
}
}
这里消息没有接收,5秒后超时,消息进入死信队列 
队列长度到达限制
1.修改RabbitMQ配置类,队列长度设置为5
@Configuration
public class DirectRabbitConfig {
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return new Queue("TDL_QUEUE");
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
}
@Bean
public Queue directQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-max-length", 5);
map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
map.put("x-dead-letter-routing-key", "TDL_KEY");
return new Queue("directQueue", true, false, false, map);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
}
}
这里队列中消息数量超过限制后,多余的消息进入死信队列中 
消费端拒绝签收,不把消息放入原有队列
发送端代码
1.RabbitMQ配置类
@Configuration
public class DirectRabbitConfig {
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return new Queue("TDL_QUEUE");
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
}
@Bean
public Queue directQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
map.put("x-dead-letter-routing-key", "TDL_KEY");
return new Queue("directQueue", true, false, false, map);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
}
}
2.发送消息
@Controller
public class SendController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("sendMessage")
@ResponseBody
public String sendMessage(String message){
Message message1 = new Message(message.getBytes());
rabbitTemplate.convertAndSend("directExchange", "routingkey001", message1);
return "ok";
}
}
接收端代码 1.配置文件,启动手动签收
spring:
application:
name: rabbitmq-consumer
rabbitmq:
host: 47.110.157.82
port: 5672
username: liu
password: 123456
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
2.RabbitMQ配置类
@Configuration
public class DirectRabbitConfig {
@Bean("deadLetterQueue")
public Queue deadLetterQueue() {
return new Queue("TDL_QUEUE");
}
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange() {
return ExchangeBuilder.directExchange("TDL_EXCHANGE").durable(true).build();
}
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue,DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("TDL_KEY");
}
@Bean
public Queue directQueue() {
Map<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange", "TDL_EXCHANGE");
map.put("x-dead-letter-routing-key", "TDL_KEY");
return new Queue("directQueue", true, false, false, map);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange("directExchange");
}
@Bean
Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("routingkey001");
}
}
3.接收消息
@Component
public class ReceiveMessage {
@RabbitListener(queues = "directQueue")
public void process(Message message,Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try{
System.out.println("directReceiver消费者收到消息 : " + message);
int a=10/0;
channel.basicAck(deliveryTag,true);
}catch (Exception e){
channel.basicNack(deliveryTag,true,false);
}
}
}
消费消息异常后,没有签收消息,将消息不放回原有队列,这时消息被放入到私信队列中 
|