实现方式一:死信队列
AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。 但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:
1、Time To Live(TTL)
RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信) RabbitMQ针对队列中的消息过期时间有两种方法可以设置。 A: 通过队列属性设置,队列中所有消息都有相同的过期时间。 B: 对消息进行单独设置,每条消息TTL可以不同。
2、Dead Letter Exchanges(DLX)
RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
队列出现dead letter的情况
1、消息或者队列的TTL过期 2、队列达到最大长度 3、消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false 代码实现 首先加入依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.16</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
编写配置文件 application.yml
spring:
application:
name: delay-queue
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin123456
virtual-host: delays1
server:
port: 8082
编写队列配置文件
#订单业务队列的名称,交换机名称、路由key、超时时间
delay.bussiness.queue: order_time_out_15s
delay.bussiness.excahnge: order_time_out_15s_exchange
delay.bussiness.route: order_time_out_15s_776
#死信队列的名称,交换机名称、路由key、超时时间
delay.dead.queue: dead_order_time_out_15s
delay.dead.excahnge: dead_order_time_out_15s_exchange
delay.dead.route: dead_order_time_out_15s_666
delay.bussiness.order.timeout: 15000
#利用rabbitmq_delayed_message_exchange实现延迟队列的方式
#插件实现订单业务队列的名称,交换机名称、路由key、超时时间
delay.plugins.queue: plugin_order_delay_30
delay.plugins.exchange: plugin_order_exchange_30
delay.plugins.route.key: plugin_order_route_key_30
delay.plugin.timeout: 30000
编写配置读取类
@Configuration
@PropertySource("classpath:rabbitmqs.properties")
@Data
public class OrderDelayConfig {
@Value("${delay.bussiness.queue}")
private String orderDelayQueueName;
@Value("${delay.bussiness.excahnge}")
private String orderDelayExchangeName;
@Value("${delay.bussiness.route}")
private String orderDelayRouteKey;
@Value("${delay.dead.queue}")
private String orderDeadDelayQueueName;
@Value("${delay.dead.excahnge}")
private String orderDeadDelayExchangeName;
@Value("${delay.dead.route}")
private String orderDeadDelayRouteKey;
@Value("${delay.bussiness.order.timeout}")
private Long timeout;
@Value("${delay.plugins.queue}")
private String pluginOrderQueueName;
@Value("${delay.plugins.exchange}")
private String pluginOrderExchangeName;
@Value("${delay.plugins.route.key}")
private String pluginOrderRouteKey;
@Value("${delay.plugin.timeout}")
private Long pluginTimeout;
}
编写队列、交换机创建、交换机和队列、路由key值绑定的配置类
private final String dlexchange = "x-dead-letter-exchange";
private final String dlRouteKey = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
@Autowired
private OrderDelayConfig orderDelayConfig;
@Bean("orderDeadExchange")
public DirectExchange deadTopicExchange() {
return new DirectExchange(orderDelayConfig.getOrderDeadDelayExchangeName());
}
@Bean("orderExchange")
public DirectExchange payTopicExchange() {
return new DirectExchange(orderDelayConfig.getOrderDelayExchangeName());
}
@Bean("orderDeadQueue")
public Queue deadQueue() {
return new Queue(orderDelayConfig.getOrderDeadDelayQueueName());
}
@Bean("orderQueue")
public Queue payQueue() {
Map<String, Object> params = new HashMap<>();
params.put(ttl, orderDelayConfig.getTimeout());
params.put(dlexchange, orderDelayConfig.getOrderDeadDelayExchangeName());
params.put(dlRouteKey, orderDelayConfig.getOrderDeadDelayRouteKey());
return QueueBuilder.durable(orderDelayConfig.getOrderDelayQueueName()).withArguments(params).build();
}
@Bean
public Binding delayBindingA(@Qualifier("orderQueue") Queue queue,
@Qualifier("orderExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getOrderDelayRouteKey());
}
@Bean
public Binding BindingErrorQueueAndExchange(@Qualifier("orderDeadQueue") Queue deadQueue,
@Qualifier("orderDeadExchange") DirectExchange exchange) {
return BindingBuilder.bind(deadQueue).to(exchange).with(orderDelayConfig.getOrderDeadDelayRouteKey());
}
以上已经完成了配置工作,下面需要完成业务代码实现 1、新建订单实体类
@Data
public class Order {
private String orderNo;
private BigDecimal price;
private int prodductNum;
private BigDecimal totalAmount;
private Date createTime;
}
定义消息发送和消息消费
@Component
@EnableScheduling
@Slf4j
public class OrderDelayQueue {
private RabbitTemplate rabbitTemplate;
private OrderDelayConfig orderDelayConfig;
private final static String orderQueueName = "dead_order_time_out_15s";
public OrderDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) {
this.rabbitTemplate = rabbitTemplate;
this.orderDelayConfig = orderDelayConfig;
}
@Scheduled(cron = "0/30 * * * * ?")
public void sendOrderMsg() {
Order order;
for (int i = 0; i < 3; i++) {
order = new Order();
order.setOrderNo(new Snowflake().nextIdStr());
order.setCreateTime(new Date());
rabbitSendMsg(JSON.toJSONString(order));
}
}
public void rabbitSendMsg(String msg) {
rabbitTemplate.convertAndSend(orderDelayConfig.getOrderDelayExchangeName(),
orderDelayConfig.getOrderDelayRouteKey(), msg);
}
@RabbitListener(queues = orderQueueName)
public void infoConsumption(String data) throws Exception {
final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
final Order order = JSONObject.parseObject(data, Order.class);
final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000;
log.info(order.getOrderNo() + "死信队列========:订单已经超时了" + "失效时间" + diff + "秒");
}
}
到此完成全部代码,启动项目执行即可完成死信队列实现延迟队列功能。
方式2:利用rabbitmq_delayed_message_exchange实现延迟队列
安装插件
1、下载延时消息插件:https://www.rabbitmq.com/community-plugins.html 2、将下载的文件放在rabbitmq的安装路径plugins文件中
安装插件 打开rabbitmq的命令界面 执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 入上图所示表示安装成功
代码实现
配置信息,在第一种方式中已配置
1、队列、交换机、路由key值创建,绑定
@Bean("pluginOrderExchange")
public CustomExchange pluginExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(orderDelayConfig.getPluginOrderExchangeName(),
"x-delayed-message", true, false, args);
}
@Bean("pluginOrderQueue")
public Queue pluginQueue() {
return new Queue(orderDelayConfig.getPluginOrderQueueName());
}
@Bean
public Binding delayBindingPlugin(@Qualifier("pluginOrderQueue") Queue queue,
@Qualifier("pluginOrderExchange") CustomExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getPluginOrderRouteKey()).noargs();
}
2、消息发送,消息消费
@Component
@EnableScheduling
@Slf4j
public class OrderPluginDelayQueue {
private RabbitTemplate rabbitTemplate;
private OrderDelayConfig orderDelayConfig;
private final static String orderQueueName = "plugin_order_delay_30";
public OrderPluginDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) {
this.rabbitTemplate = rabbitTemplate;
this.orderDelayConfig = orderDelayConfig;
}
@Scheduled(cron = "0/30 * * * * ?")
public void sendOrderMsg() {
Order order;
for (int i = 0; i < 3; i++) {
order = new Order();
order.setOrderNo(new Snowflake().nextIdStr());
order.setCreateTime(new Date());
rabbitSendMsg(JSON.toJSONString(order),orderDelayConfig.getPluginTimeout().intValue());
}
}
public void rabbitSendMsg(String msg,int delayTime) {
rabbitTemplate.convertAndSend(orderDelayConfig.getPluginOrderExchangeName(),
orderDelayConfig.getPluginOrderRouteKey(), msg,s->{
s.getMessageProperties().setDelay(delayTime);
return s;
});
}
@RabbitListener(queues = orderQueueName)
public void infoConsumption(String data) throws Exception {
final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
final Order order = JSONObject.parseObject(data, Order.class);
final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000;
log.info(order.getOrderNo() + "插件实现=============订单已经超时了" + "失效时间" + diff + "秒");
}
}
以上完成第二种方式代码 实现结果截图
|