(一)RabbitMq开启延迟队列
rabbitmq 延迟队列的实现,需要插件的支持
参考 :docker 安装 rabbitmq并添加延迟队列插件
(二)项目设置
(1)依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)yml配置
spring:
rabbitmq:
host: xxx
port: xxx
username: xxx
password: xxx
(三)延迟消息实现核心配置
延迟对列的实现,实质主要是要定义且配置一个延迟交换机
@Bean
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange("test-delayed-exchange", "x-delayed-message", true, false, args);
}
核心:
自定义参数定义交换机类型:
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
申明交换机中的消息为延迟消息:
上边CustomExchange 构造器中的第二个参数x-delayed-message
我们接下来进行一个完整的配置定义,我们定义交换机,定义队列,队列交换机路由件绑定
package com.leilei.delayed;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMqDelayedConfig {
@Bean
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange("test-delayed-exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("test-delayed-queue").build();
}
@Bean
public Binding delayQueueBindExchange(Exchange delayExchange, Queue delayQueue) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with("leilei").noargs();
}
}
(四)发送延迟消息
(1)延迟消息生产者
延迟消息的发送,与普通队列发送消息大部分一致,但我们需要对message做额外的延时配置
EX:
@Service
public class DelayedProvider {
private final AmqpTemplate rabbitTemplate;
@Autowired
public DelayedProvider(AmqpTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(int delayedTime, String messageBody) {
rabbitTemplate.convertAndSend("test-delayed-exchange", "leilei", messageBody, message -> {
message.getMessageProperties().setDelay(delayedTime);
System.out.println("消息发送时间:" + LocalDateTime.now()
.format(DateTimeFormatter.ofPattern("yyy-MM-dd HH:mm:ss")) +
"消息内容:" + messageBody);
return message;
});
}
}
(2)延迟消息消费者
消费者端与普通的MQ消费者,一致,只需要监听对应的队列即可
@Service
public class DelayedConsumer {
@RabbitListener(queues = "test-delayed-queue")
public void testListenerDelayedMessage(Message message) {
byte[] body = message.getBody();
System.out.println(LocalDateTime.now() + new String(body));
}
}
项目链接:Springboot+RabbitMQ
|