10-Spring Boot整合RabbitMQ
?
简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
-
application.yml文件配置RabbitMQ相关信息;
-
在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
-
注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
-
application.yml文件配置RabbitMQ相关信息
-
创建消息处理类,用于接收队列中的消息并进行处理
搭建生产者工程
1.创建工程
创建生产者工程 springboot-rabbitmq-producer
2. 添加依赖
修改pom.xml文件内容为如下:
<?xml?version="1.0"?encoding="UTF-8"?>
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>
????<!--?springboot?父工程???-->
????<parent>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-parent</artifactId>
????????<version>2.6.4</version>
????????<relativePath/>?<!--?lookup?parent?from?repository?-->
????</parent>
????<!--?工程坐标???-->
????<groupId>com.lijw</groupId>
????<artifactId>springboot-rabbitmq-producer</artifactId>
????<version>0.0.1-SNAPSHOT</version>
????<!--?工程信息???-->
????<name>springboot-rabbitmq-producer</name>
????<description>Demo?project?for?Spring?Boot</description>
????<properties>
????????<java.version>1.8</java.version>
????</properties>
????<!--?工程依赖???-->
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter</artifactId>
????????</dependency>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-test</artifactId>
????????????<scope>test</scope>
????????</dependency>
????????<!--?rabbitmq起步依赖???-->
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-amqp</artifactId>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.boot</groupId>
????????????????<artifactId>spring-boot-maven-plugin</artifactId>
????????????</plugin>
????????</plugins>
????</build>
</project>
3. 启动类
package?com.lijw.springbootrabbitmqproducer;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public?class?SpringbootRabbitmqProducerApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(SpringbootRabbitmqProducerApplication.class,?args);
????}
}
4. 配置RabbitMQ
1)配置文件
创建application.yml,内容如下:
spring:
??rabbitmq:
????host:?localhost
????port:?5672
????virtual-host:?/
????username:?guest
????password:?guest
2)绑定交换机和队列
创建RabbitMQ队列与交换机绑定的配置类com.lijw.springbootrabbitmqproducer.config.RabbitMQConfig
package?com.lijw.springbootrabbitmqproducer.config;
import?org.springframework.amqp.core.*;
import?org.springframework.beans.factory.annotation.Qualifier;
import?org.springframework.context.annotation.Bean;
import?org.springframework.context.annotation.Configuration;
@Configuration
public?class?RabbitMQConfig?{
????//?交换机名称
????public?static?final?String?ITEM_TOPIC_EXCHANGE?=?"item_topic_exchange";
????//?队列名称
????public?static?final?String?ITEM_QUEUE?=?"item_queue";
????//?声明交换机
????@Bean("itemTopicExchange")
????public?Exchange?topicExchange(){
????????/**
?????????*?ExchangeBuilder
?????????*?-?topicExchange(ITEM_TOPIC_EXCHANGE)?设置?topic?模式的交换机名称
?????????*?-?durable(true)?设置持久化
?????????*?-?build()?构建返回?Exchange?对象
?????????*/
????????return?ExchangeBuilder.topicExchange(ITEM_TOPIC_EXCHANGE).durable(true).build();
????}
????//?声明队列
????@Bean("itemQueue")
????public?Queue?itemQueue(){
????????/**
?????????*?QueueBuilder
?????????*?-?.durable(ITEM_QUEUE)?设置队列名称以及持久化
?????????*?-?.build()?构建返回?Queue?对象
?????????*/
????????return?QueueBuilder.durable(ITEM_QUEUE).build();
????}
????//?绑定队列与交换机
????@Bean
????public?Binding?itemQueueExchange(@Qualifier("itemQueue")?Queue?queue,
?????????????????????????????????????@Qualifier("itemTopicExchange")?Exchange?exchange){
????????/**
?????????*?BindingBuilder
?????????*?-?.bind(queue)?绑定队列
?????????*?-?.to(exchange)?绑定交换机
?????????*?-?with("item.#")?设置?Routing?Key
?????????*?-?.noargs()?设置无参数
?????????*/
????????return?BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
????}
}
5.测试发送消息
在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:
package?com.lijw.springbootrabbitmqproducer;
import?com.lijw.springbootrabbitmqproducer.config.RabbitMQConfig;
import?org.junit.jupiter.api.Test;
import?org.springframework.amqp.rabbit.core.RabbitTemplate;
import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public?class?RabbitMQTest?{
????@Autowired
????private?RabbitTemplate?rabbitTemplate;
????//?生产者发送消息
????@Test
????public?void?testSend()?{
????????/**
?????????*?1.?参数一:?交换机名称
?????????*?2.?参数二:?routingKey
?????????*?3.?参数三:?发送的消息
?????????*/
????????rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,?"item.insert",?"商品新增,routing?key?为item.insert");
????????rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,?"item.update",?"商品修改,routing?key?为item.update");
????????rabbitTemplate.convertAndSend(RabbitMQConfig.ITEM_TOPIC_EXCHANGE,?"item.delete",?"商品删除,routing?key?为item.delete");
????}
}
运行上述测试程序(交换机和队列才能先被声明和绑定),执行如下:
执行完毕后, 可以在RabbitMQ的管理控制台中查看到交换机与队列的绑定:
在上面我们已经确认了消息写入了队列,下面我们来编写消费者工程进行消费。
搭建消费者工程
1. 创建工程
创建消费者工程 springboot-rabbitmq-consumer
2. 添加依赖
修改pom.xml文件内容为如下:
<?xml?version="1.0"?encoding="UTF-8"?>
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
?????????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd">
????<modelVersion>4.0.0</modelVersion>
????<!--?springboot?父工程??-->
????<parent>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-parent</artifactId>
????????<version>2.6.4</version>
????????<relativePath/>?<!--?lookup?parent?from?repository?-->
????</parent>
????<!--?工程坐标???-->
????<groupId>com.lijw</groupId>
????<artifactId>springboot-rabbitmq-consumer</artifactId>
????<version>0.0.1-SNAPSHOT</version>
????<!--?工程信息???-->
????<name>springboot-rabbitmq-consumer</name>
????<description>Demo?project?for?Spring?Boot</description>
????<properties>
????????<java.version>1.8</java.version>
????</properties>
????<!--?工程依赖???-->
????<dependencies>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter</artifactId>
????????</dependency>
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-test</artifactId>
????????????<scope>test</scope>
????????</dependency>
????????<!--?rabbitmq的起步依赖??-->
????????<dependency>
????????????<groupId>org.springframework.boot</groupId>
????????????<artifactId>spring-boot-starter-amqp</artifactId>
????????</dependency>
????</dependencies>
????<build>
????????<plugins>
????????????<plugin>
????????????????<groupId>org.springframework.boot</groupId>
????????????????<artifactId>spring-boot-maven-plugin</artifactId>
????????????</plugin>
????????</plugins>
????</build>
</project>
3. 启动类
package?com.lijw.springbootrabbitmqconsumer;
import?org.springframework.boot.SpringApplication;
import?org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public?class?SpringbootRabbitmqConsumerApplication?{
????public?static?void?main(String[]?args)?{
????????SpringApplication.run(SpringbootRabbitmqConsumerApplication.class,?args);
????}
}
4. 配置RabbitMQ
创建application.yml,内容如下:
spring:
??rabbitmq:
????host:?localhost
????port:?5672
????virtual-host:?/
????username:?guest
????password:?guest
5. 消息监听处理类
编写消息监听器 com.lijw.springbootrabbitmqconsumer.listener.MyListener
package?com.lijw.springbootrabbitmqconsumer.listener;
import?org.springframework.amqp.rabbit.annotation.RabbitListener;
import?org.springframework.stereotype.Component;
@Component
public?class?MyListener?{
????/**
?????*?监听某个队列的消息
?????*?@param?message?接收到的消息
?????*/
????@RabbitListener(queues?=?"item_queue")
????public?void?myListener1(String?message){
????????System.out.println("消费者接收到的消息:?"?+?message);
????}
}
6. 测试接收消息
启动消费者工程,查看接收到的消息:
我们可以在生产者工程多发送几次消息看看,如下:
到这里,我们已经确认消费者能够正常接收消息了。