一般来说,生产者将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,如果它一直无法消费某条数据,那么可以把这条消息放入死信队列里面。等待 条件满足了再从死信队列中取出来再次消费,从而避免消息丢失。
编写可根据上篇博客来
交换机的使用
1,生产者创建队列和交换机
package com.lgs.scz.mq;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@SuppressWarnings("all")
public class DeadConfig {
/**
* 创建队列
*/
@Bean
public Queue normalQueue(){
Map<String,Object> config=new HashMap<>();
//过期时间
config.put("x-message-ttl", 10000);
//死信交换机
config.put("x-dead-letter-exchange", "deadExchange");
//死信routing key
config.put("x-dead-letter-routing-key", "DD");
return new Queue("normalQueue",true,false,false,config);
}
@Bean
public Queue deadQueue(){
return new Queue("deadQueue",true);
}
/**
* 创建交换机
*/
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normalExchange");
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange("deadExchange");
}
/**
* 进行交换机和队列的绑定
*/
@Bean
public Binding normalBinding() {
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with("CC");
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("DD");
}
}
2,生产者发送信息
package com.lgs.scz.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@SuppressWarnings("all")
@Slf4j
public class ProviderController {
@Autowired
private RabbitTemplate template;
@RequestMapping("/deadSend")
public String deadSend(){
log.warn("订单已经保存");
template.convertAndSend("normalExchange","CC","order-1902");
return "yes";
}
}
3,消费者接收信息
package com.lgs.xfz.mq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "deadQueue")
@Slf4j
public class DeadReceiver {
@RabbitHandler
public void process(String message){
log.warn(message+":该订单已经过期");
}
}