引入依赖
创建 maven 项目 RabbirMQ,引入所使用的依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.20</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
RabbitMQ 生产者
配置文件以及主启动类
首先在项目中创建子 maven 项目 RabbitProducer,配置 RabbitMQ 服务地址
server:
port: 8081
spring:
rabbitmq:
host: <RabbitMq Server ip>
port: 5672
创建主启动类
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class);
}
}
配置 RabbitMQ 的 Exchange 以及 Queue
此处分别配置两种模式的交换机 Exchange,即 Direct 模式和 Topic 模式,有关交换机模式的内容请阅读RabbitMQ 基础
Direct Exchange
@Configuration
public class RabbitConfigFroDirect {
@Bean
public Queue directQueue01() {
return new Queue("My_Direct_Queue01");
}
@Bean
public Queue directQueue02() {
return new Queue("My_Direct_Queue02");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("My_Direct_Exchange");
}
@Bean
public Binding bindingDirect01() {
return BindingBuilder.bind(directQueue01()).to(directExchange()).with("Queue01");
}
@Bean
public Binding bindingDirect02() {
return BindingBuilder.bind(directQueue02()).to(directExchange()).with("Queue02");
}
}
Topic Exchange
@Configuration
public class RabbitConfigForTopic {
@Bean
public Queue topicQueue01() {
return new Queue("My_Topic_Queue01");
}
@Bean
public Queue topicQueue02() {
return new Queue("My_Topic_Queue02");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("My_Topic_Exchange");
}
@Bean
public Binding bindingTopic01() {
return BindingBuilder.bind(topicQueue01()).to(topicExchange()).with("*.topic");
}
@Bean
public Binding bindingTopic02() {
return BindingBuilder.bind(topicQueue02()).to(topicExchange()).with("#.topic");
}
}
编写消息发送的服务类
编写消息发送服务类的接口,定义发送消息的方法,需要指定 Routing Key 以及消息内容
public interface ProducerService {
String sendMessage(String routingKey, String detail);
}
对不同模式的 Exchange 分别实现上述接口,实现消息发送的方法。在调用消息发送的方法时,需要指定 Exchange、Routing Key 以及 消息内容
@Service
public class DirectProducerServiceImpl implements ProducerService {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public String sendMessage(String routingKey, String detail) {
amqpTemplate.convertAndSend("My_Direct_Exchange", routingKey, detail);
return "消息发送成功:{Exchange:" + "My_Direct_Exchange|" + "Routing Key:" + routingKey + "|data:" + detail + "}";
}
}
@Service
public class TopicProducerServiceImpl implements ProducerService {
@Resource
private AmqpTemplate amqpTemplate;
@Override
public String sendMessage(String routingKey, String detail) {
amqpTemplate.convertAndSend("My_Topic_Exchange", routingKey, detail);
return "消息发送成功:{Exchange:" + "My_Topic_Exchange|" + "Routing Key:" + routingKey + "|data:" + detail + "}";
}
}
编写控制层接收请求并调用消息发送服务
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Resource(name = "directProducerServiceImpl")
private ProducerService directProducerService;
@Resource(name = "topicProducerServiceImpl")
private ProducerService topicProducerService;
@GetMapping ("/direct/{routingKey}/{detail}")
public String sendForDirect(@PathVariable("routingKey") String routingKey,
@PathVariable("detail") String detail) {
return directProducerService.sendMessage(routingKey, detail);
}
@GetMapping ("/topic/{routingKey}/{detail}")
public String sendForTopic(@PathVariable("routingKey") String routingKey,
@PathVariable("detail") String detail) {
return topicProducerService.sendMessage(routingKey, detail);
}
}
RabbitMQ 消费者
配置文件以及主启动类
创建子 maven 项目 RabbitConsumer,编写配置文件
server:
port: 8081
spring:
rabbitmq:
host: <RabbitMq Server ip>
port: 5672
创建主启动类
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
编写消息接收服务
使用@RabbitListener 注解监听指定的队列,获取数据并进行处理即可
注:一般消息的传输使用 JSON 字符串的形式实现
@Component
public class RabbitReceiver {
@RabbitListener(queues = "My_Direct_Queue01")
public void directReceiver01(String detail) {
System.out.println("消息接收成功:{" + "Queue:My_Direct_Queue01|data:" + detail + "}");
}
@RabbitListener(queues = "My_Direct_Queue02")
public void directReceiver02(String detail) {
System.out.println("消息接收成功:{" + "Queue:My_Direct_Queue02|data:" + detail + "}");
}
@RabbitListener(queues = "My_Topic_Queue01")
public void topicReceiver01(String detail) {
System.out.println("消息接收成功:{" + "Queue:My_Topic_Queue01|data:" + detail + "}");
}
@RabbitListener(queues = "My_Topic_Queue02")
public void topicReceiver02(String detail) {
System.out.println("消息接收成功:{" + "Queue:My_Topic_Queue02|data:" + detail + "}");
}
}
|