1、SpringBoot整合RabbitMQ
1.1、Direct 模式(路由模式)
-
P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key -
X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列 -
C1:消费者,其所在队列指定了需要 routing key 为 error 的消息 -
C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
模式说明:
-
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key) -
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey -
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息- 的 Routing key 完全一致,才会接收到消息
1.2、Topic 模式(通配符模式)
- Queue Q1:绑定的routing key是 *.orange.*,因此第二个单词是orange并且含有第三个单词的routing key 都会被匹配到。例如:tomatoes.orange.small
- Queue Q2:绑定的routing key 分别是*.*.rabbite 和Lazy.# ,因此第三个单词rabbite或以Lazy开头的routing key 都会被匹配
模式说明:
-
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符 -
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert -
通配符规则:# 匹配一个或多个词 ,* 匹配不多不少恰好1个词 ,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
1.3、Fanout模式(广播模式)
不处理路由键 ,只需要简单的将队列绑定到交换机上发送到交换机的消息都会被转发到与该交换机绑定的所有队列上 。
Fanout交换机转发消息是最快的 。
1.4、代码整合
-
添加依赖 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
-
配置整合
- yaml配置参数
spring:
rabbitmq:
host: 192.168.0.134
port: 5672
username: admin
password: admin
virtual-host: /admin
- 生产者配置文件
@Configuration
public class RabbitMqConfig {
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String TOPIC_QUEUE = "topic_queue";
public static final String TOPIC_ROUTING = "topic.routing.#";
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_QUEUE = "direct_queue";
public static final String DIRECT_ROUTING = "direct.routing";
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE_01 = "fanout_queue_01";
public static final String FANOUT_QUEUE_02 = "fanout_queue_02";
@Bean(TOPIC_EXCHANGE)
public Exchange topicExchange(){
return ExchangeBuilder.topicExchange(TOPIC_EXCHANGE).durable(true).build();
}
@Bean(TOPIC_QUEUE)
public Queue topicQueue(){
return QueueBuilder.durable(TOPIC_QUEUE).build();
}
@Bean
public Binding topicQueueExchange(@Qualifier(TOPIC_QUEUE) Queue queue, @Qualifier(TOPIC_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(TOPIC_ROUTING).noargs();
}
@Bean(DIRECT_EXCHANGE)
public Exchange directExchange(){
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).build();
}
@Bean(DIRECT_QUEUE)
public Queue directQueue(){
return QueueBuilder.durable(DIRECT_QUEUE).build();
}
@Bean
public Binding directQueueExchange(@Qualifier(DIRECT_QUEUE) Queue queue, @Qualifier(DIRECT_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DIRECT_ROUTING).noargs();
}
@Bean(FANOUT_EXCHANGE)
public Exchange fanoutExchange(){
return ExchangeBuilder.fanoutExchange(FANOUT_EXCHANGE).durable(true).build();
}
@Bean(FANOUT_QUEUE_01)
public Queue fanout01Queue(){
return QueueBuilder.durable(FANOUT_QUEUE_01).build();
}
@Bean(FANOUT_QUEUE_02)
public Queue fanout02Queue(){
return QueueBuilder.durable(FANOUT_QUEUE_02).build();
}
@Bean
public Binding fanout01QueueExchange(@Qualifier(FANOUT_QUEUE_01) Queue queue, @Qualifier(FANOUT_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
@Bean
public Binding fanout02QueueExchange(@Qualifier(FANOUT_QUEUE_02) Queue queue, @Qualifier(FANOUT_EXCHANGE) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
}
- 生产者消息推送,需要spring-boot-starter-test测试包,根据实际情况引入
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendTopic() {
rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE, RabbitMqConfig.TOPIC_ROUTING, "topic模式消息");
}
@Test
public void sendDirect() {
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING, "direct模式消息");
}
@Test
public void sendFanout() {
rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE,"", "fanout模式消息");
}
}
-
消费者接收类
@Component
public class DirectListener {
@RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE)
public void ListenerQueue(Message message){
System.out.println("direct message:"+new String(message.getBody()));
}
}
@Component
public class Fanout01Listener {
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_01)
public void ListenerQueue(Message message){
System.out.println("fanout01 message:"+new String(message.getBody()));
}
}
@Component
public class Fanout02Listener {
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_02)
public void ListenerQueue(Message message){
System.out.println("fanout02 message:"+new String(message.getBody()));
}
}
@Component
public class TopicListener {
@RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE)
public void ListenerQueue(Message message){
System.out.println("topic message:"+new String(message.getBody()));
}
}
|