RabbitMq 延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景
- 订单在十分钟之内未支付则自动取消 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
延迟队列构架图
下边的代码是按照这样的构架写的
声明交换机和相关队列
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE="X";
public static final String Y_DEAD_LETTER_EXCHANGE="Y";
public static final String QUEUE_C="QC";
public static final String DEAD_LETTER_QUEUE="QD";
@Bean("xExechange")
public DirectExchange xExechange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExechange")
public DirectExchange yExechange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueC")
public Queue queueC(){
HashMap<String, Object> hashMap = Maps.newHashMap();
hashMap.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
hashMap.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(hashMap).build();
}
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExechange")DirectExchange xExechange){
return BindingBuilder.bind(queueC).to(xExechange).with("XC");
}
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExechange")DirectExchange xExechange){
return BindingBuilder.bind(queueD).to(xExechange).with("YD");
}
}
消息生产者代码编写
@RestController
@RequestMapping("/send/message/")
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("{msg}/{ttlTime}")
public String sendMessage(@PathVariable("msg") final String msg, @PathVariable("ttlTime")String ttlTime){
rabbitTemplate.convertAndSend(TtlQueueConfig.X_EXCHANGE,"XC",msg,message->{
message.getMessageProperties().setExpiration(ttlTime);
return message;
});
return msg;
}
}
消息消费者代码编写
@Slf4j
@Component
public class DeadLetterQueueCustomer {
@RabbitListener(queues = TtlQueueConfig.DEAD_LETTER_QUEUE)
public void received(Message message, Channel channel){
String msg = new String(message.getBody());
System.out.println("msg = " + msg);
}
}
启动项目
在浏览器中输入对应的url地址发送消息和消息延迟时间,在对应的时间之后就会获取到相关的消息
|