前言
提示:Springboot集成Rabbitmq实战案例,通过接口调用的方式演示。
提示:以下是本篇文章正文内容,下面案例可供参考
一、集成步骤
一、生产者:
-
创建生产者SpringBoot工程 -
引入pom依赖 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
-
编写rabbitmq配置消息 -
定义交换机,队列以及绑定关系的配置类 -
注入RabbitTemplate,调用方法,完成消息发送
二、消费者:
-
创建生产者SpringBoot工程 -
引入pom依赖 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
-
编写rabbitmq配置消息 -
定义监听类,使用@RabbitListener注解完成队列监听。
二、实现步骤
1.项目架构图
2.创建项目
代码如下(示例):
1.pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sky</groupId>
<artifactId>springboot-rabbitmq-module</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-module</name>
<description>springboot-rabbitmq-module</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>springboot_rabbitmq</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2.application.properties配置
server.port=8080
#spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.addresses=110.42.239.246
spring.rabbitmq.virtual-host=springboot
#spring.rabbitmq.addresses=110.42.239.246:5672,110.42.239.247:5672,110.42.239.248:5672
说明:这里免费提供rabbitmq连接方式给大家使用学习
3.config配置
HelloWorldConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class HelloWorldConfig {
@Bean
public Queue setQueue() {
return new Queue("helloWorldqueue");
}
}
FanoutConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public Queue fanoutQ1() {
return new Queue("fanout.q1");
}
@Bean
public Queue fanoutQ2() {
return new Queue("fanout.q2");
}
@Bean
public FanoutExchange setFanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding bindQ1() {
return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
}
@Bean
public Binding bindQ2() {
return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
}
}
WorkConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WorkConfig {
@Bean
public Queue workQ1() {
return new Queue("work_sb_mq_q");
}
}
DirectConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
public Queue directQ1() {
return new Queue("direct_sb_mq_q1");
}
@Bean
public Queue directQ2() {
return new Queue("direct_sb_mq_q2");
}
@Bean
public DirectExchange setDirectExchange() {
return new DirectExchange("directExchange");
}
@Bean
public Binding bindDirectBind1() {
return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("directBind.one");
}
@Bean
public Binding bindDirectBind2() {
return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("directBind.two");
}
}
TopicConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
public Queue topicQ1() {
return new Queue("topic_sb_mq_q1");
}
@Bean
public Queue topicQ2() {
return new Queue("topic_sb_mq_q2");
}
@Bean
public TopicExchange setTopicExchange() {
return new TopicExchange("topicExchange");
}
@Bean
public Binding bindTopicHebei1() {
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("directBind.*");
}
@Bean
public Binding bindTopicHebei2() {
return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.two");
}
}
4.消费端component
package com.sky.springbootrabbitmqmodule.component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConcumerReceiver {
@RabbitListener(queues="helloWorldqueue")
public void helloWorldReceive(String message) {
System.out.println("helloWorld模式 received message : " +message);
}
@RabbitListener(queues="work_sb_mq_q")
public void wordQueueReceiveq1(String message) {
System.out.println("工作队列模式1 received message : " +message);
}
@RabbitListener(queues="work_sb_mq_q")
public void wordQueueReceiveq2(String message) {
System.out.println("工作队列模式2 received message : " +message);
}
@RabbitListener(queues="fanout.q1")
public void fanoutReceiveq1(String message) {
System.out.println("发布订阅模式1received message : " +message);
}
@RabbitListener(queues="fanout.q2")
public void fanoutReceiveq2(String message) {
System.out.println("发布订阅模式2 received message : " +message);
}
@RabbitListener(queues="direct_sb_mq_q1")
public void routingReceiveq1(String message) {
System.out.println("Routing路由模式routingReceiveqOne received message : " +message);
}
@RabbitListener(queues="direct_sb_mq_q2")
public void routingReceiveq2(String message) {
System.out.println("Routing路由模式routingReceiveqTwo received message : " +message);
}
@RabbitListener(queues="topic_sb_mq_q1")
public void topicReceiveq1(String message) {
System.out.println("Topic模式 topic_sb_mq_q1 received message : " +message);
}
@RabbitListener(queues="topic_sb_mq_q2")
public void topicReceiveq2(String message) {
System.out.println("Topic模式 topic_sb_mq_q2 received message : " +message);
}
}
5.生产者controller
package com.sky.springbootrabbitmqmodule.controller;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(value="/helloWorldSend")
public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
@GetMapping(value="/workqueueSend")
public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
for (int i = 0; i <10 ; i++) {
rabbitTemplate.send("work_sb_mq_q", new Message(message.getBytes("UTF-8"),messageProperties));
}
return "message sended : "+message;
}
@GetMapping(value="/fanoutSend")
public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
@GetMapping(value="/directSend")
public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
if(null == routingKey) {
routingKey="directBind.one";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
@GetMapping(value="/topicSend")
public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
if(null == routingKey) {
routingKey="directBind.one";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
}
上面就是这个项目的所有代码了,下面就是Demo演示内容。
三、演示步骤
1.启动项目
2.调用接口演示
1.直连模式
1.接口调用
2.控制台打印
2.工作队列模式
1.接口调用
2.控制台打印
3.发布订阅模式(交换机类型:fanout)
1.接口调用
2.控制台打印
4.路由工作模式(交换机类型:direct)
1.接口调用
2.控制台打印
5.通配符模式(交换机类型:topic)
1.接口调用
2.控制台打印
除此之外,我还提供了项目地址提供给大家clone,地址链接:https://gitee.com/java_wxid/liao
总结
提示:以上就是今天要讲的内容,本文介绍了Springboot如何快速集成Rabbitmq,提供了五种模式的Demo案例演示给大家参考,希望对大家有所帮助。
|