本次延迟队列场景,先发一个消息到队列,这个队列没有消费者,这个队列延迟n秒后,会进入下一个逻辑处理。本次逻辑是消息会进入死信队列消费。上代码
maven
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
定义一个全局配置类,来控制队列信息。本次使用注解配置,也可以使用xml配置。
@Configuration
public class MqConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明 xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}
// 声明 xExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
// 声明队列 A ttl 为 10s 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB() {
Map<String, Object> args = new HashMap<>(3);
// 声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
// 声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
// 声明队列 B 绑定 X 交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
// 声明死信队列 QD
@Bean("queueD")
public Queue queueD() {
return new Queue(DEAD_LETTER_QUEUE);
}
// 声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange) {
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
定义生产者
@Controller
@Slf4j
public class delayProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send/{ttl}")
public void send(@PathVariable String ttl){
log.info("---->send message to X whit routKey XA----->");
rabbitTemplate.convertAndSend("X","XA",ttl);
log.info("---->send message to X whit routKey XB----->");
rabbitTemplate.convertAndSend("X","XB",ttl);
log.info("send end!!!");
}
}
?定义消费者,为了方便测试在期间消费的不再出现死信队列,我这里每个队列都定义了消费者。
@Slf4j
@Component
public class DelayComsume {
@RabbitListener(queues = "QD")
public void consumeQD(Message message){
String msg = new String(message.getBody());
log.info("QD consumer------>"+msg);
}
@RabbitListener(queues = "QA")
public void consumeQA(Message message){
String msg = new String(message.getBody());
log.info("QA consumer------>"+msg);
}
@RabbitListener(queues = "QB")
public void consumeQB(Message message) throws InterruptedException {
Thread.sleep(3000);
String msg = new String(message.getBody());
log.info("QB consumer------>"+msg);
}
}
?效果,当存在QAQB监听时,死信队列没有消费。如果去掉QAQB监听,则会在延迟时间之后进行消费
?
|