RabbitMQ工作流程
左边是生产者,右边是消费者,中间红框内是RabbitMQ服务器,其中包括交换机以及消息队列
消费者发送消息给RabbitMQ服务器,交换机接收到消息,然后根据不同的交换机规则投递给消息队列,消费者订阅消息进行消费
RabbitMQ交换机类型
作用:接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout三种,其他的由于性能不好等原因几乎用不到,后续主要介绍这三个。
Direct Exchange ----- 直连型交换机,根据消息携带的路由键将消息投递给对应队列。
Topic Exchange ----- 主题交换机,根据一定规则将消息投递给对应队列。
Fanout Exchange ----- 扇型交换机,这个交换机没有路由键概念,这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
编写RabbitMQ示例
为了更加直观的进行演示,本示例会创建两个项目,均采用SpringBoot作为框架,一个生产者(rabbitmq-provider),一个消费者(rabbitmq-consumer),项目大家自行创建。
<!--web相关依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# 生产者配置
server:
port: 8001
spring:
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 消费者配置
server:
port: 8002
spring:
application:
name: rabbitmq-consumer
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
package com.chentawen.rabbitmqprovider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Author: CTW
* @Date: create in 2021/8/2 21:23
*/
@Configuration
public class DirectExchangeConfig {
/**
* 声明直连交换机
*
* @return
*/
@Bean
DirectExchange MyDirectExchange() {
return new DirectExchange("MyDirectExchange", true, false);
}
/**
* 声明队列
*
* @return
*/
@Bean
Queue MyDirectQueue() {
return new Queue("MyDirectQueue", true);
}
/**
* 将交换机和队列进行绑定
*
* @return
*/
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(MyDirectQueue()).to(MyDirectExchange()).with("DirectRoutingKey");
}
}
package com.chentawen.rabbitmqprovider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* @Author: CTW
* @Date: create in 2021/8/2 21:23
*/
@RestController
public class SendMessageController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送消息至直连交换机
*
* @return
*/
@GetMapping("/sendMessageDirectExchange")
public String sendMessageDirectExchange() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Hello World!";
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss"));
Map<String, Object> map = new HashMap<>(16);
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
/**
* exchange 交换机名称
* routingKey 路由key
* map 发送的消息内容
*/
rabbitTemplate.convertAndSend("MyDirectExchange", "DirectRoutingKey", map);
return "消息发送成功!";
}
}
- 查看RabbitMQ监控
概述页面:准备好的消息和消息总数都是1
交换机页面可以看到刚刚创建的直连交换机: 队列页面可以看到刚刚创建的队列及消息数量
package com.chentawen.rabbitmqconsumer.listener;//package com.chentawen.springbootall.config.rabbitlistener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @Author: CTW
* @Date: create in 2021/8/2 22:23
* MyDirectQueue 监听的队列名称
*/
@Component
@RabbitListener(queues = "MyDirectQueue")
public class DirectReceiver {
@RabbitHandler
public void process(Map MessageData) {
System.out.println("rabbitmq-consumer接收到消息 : " + MessageData.toString());
}
}
后续发送的消息都是实时消费的
在RabbitMQ监控中也可以看到消息被消费掉了
- 重启项目,发送消息,查看控制台
可以看到多个消费者去消费的话是以轮询的方式进行消费,不会重复消费
以上就是本期内容,后续内容持续更新
|