IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Spring Boot整合RabbitMQ由浅入深完整讲解 -> 正文阅读

[大数据]Spring Boot整合RabbitMQ由浅入深完整讲解

Spring Boot整合RabbitMQ由浅入深完整讲解

本文需要读者对Spring Boot整合RabbitMQ有基本了解

环境介绍

软件名称软件版本
Spring Boot2.5.3
Maven3.6.3
RabbitMQ Server3.9.1
erlang24.0.5

配置单个RabbitMQ

先来一个最最简单的示例

application.properties配置文件中增加如下配置

spring.rabbitmq.host: 127.0.0.1
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest

创建一个配置类RabbitmqConfiguration.java,并且在这个类的上面使用@Configuration注解,我们用这个配置类来初始化MQ相关的配置

@Configuration
public class RabbitmqConfiguration {

}

声明队列

	@Bean
	public Queue mqQueue1() {
		String queueName = "mq-queue-1";
		return new Queue(queueName, true, false, false, new HashMap<>());
	}

声明交换机

	@Bean
	public Exchange mqExchage1() {
		String exchangeName = "mq-exchange-1";
        return new FanoutExchange(exchangeName, true, false, new HashMap<>());
    }

声明绑定

	@Bean
	public Binding bindQueue1AndExchange1(
			@Qualifier("mqQueue1") Queue mqQueue1,
			@Qualifier("mqExchage1") Exchange mqExchage1) {
		
        return BindingBuilder.bind(mqQueue1).to(mqExchage1).with("").noargs();
    }

定义放入MQ的消息数据结构

@Data
public class MqMessage {

	/**主键*/
    private String msgId;

	/**发送人*/
    private String from;
    /**接收人*/
    private String to;
    /**消息内容*/
    private String content;
    /**发送时间*/
    private long sendTime;
}

发布消息到RabbitMQ

Spring Boot框架在启动后就可以使用@Autowired方式使用MongoTemplate对象

private @Autowired RabbitTemplate rabbitTemplate;

发布消息

String exchange = "mq-exchange-1";
MqMessage mqMessage = buildMessage();
rabbitTemplate.convertAndSend(exchange, "", jsonUtil.toJson(mqMessage));

buildMessage()代码

	public MqMessage buildMessage() {
		MqMessage message = new MqMessage();
		
		message.setMsgId(UUID.randomUUID().toString());
		message.setFrom("张三");
		message.setTo("李四");
		message.setContent("hello, nice to meet you!");
		message.setSendTime(System.currentTimeMillis());
		
		return message;
	}

创建消费者

使用@RabbitListener@RabbitHandler两个注解完成消费者的创建

  • 在类的上面使用@RabbitListener,设置订阅的队列以及ack方式
  • 在方法的上面使用@RabbitHandler

    @RabbitHandler的方法最多可以接收4个参数,分别是

    • 放入RabbitMQ的消息数据,比如String,这个参数是必须要有的,另外三个参数非必须,根据业务场景自己添加
    • org.springframework.messaging.Message
    • org.springframework.amqp.core.Message
    • com.rabbitmq.client.Channel
  • 为了方便写Junit测试,加了一个List存放消费结果
@Component
@RabbitListener(queues = "mq-queue-1", ackMode = SpringRabbitConsumer.ACK_MODE)
@Slf4j
public class SpringRabbitConsumer {
	
	public static final String ACK_MODE = "MANUAL";

	private List<MqMessage> mqMessageList = new ArrayList<>();
	private JsonUtil jsonUtil;
	
	@Autowired
	public SpringRabbitConsumer(JsonUtil jsonUtil) {
		this.jsonUtil = jsonUtil;
	}

	@RabbitHandler
	public void onMessage(String message, Message amqpMessage, Channel channel) throws Exception {
		long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
		
		log.debug("Spring Rabbit Consumer: {}", message);
		
		try {
			mqMessageList.add(jsonUtil.fromJson(message, MqMessage.class));
		} catch (Exception e) {
			log.error("===", e);
		} finally {
			channel.basicAck(deliveryTag, false);
		}
	}

	public List<MqMessage> getMqMessageList() {
		return mqMessageList;
	}
}

编写Junit单元测试

Junit测试类上面增加配置属性

@TestPropertySource(properties = {
		""
		,"spring.rabbitmq.host: 127.0.0.1"
		,"spring.rabbitmq.port: 5672"
		,"spring.rabbitmq.username: guest"
		,"spring.rabbitmq.password: guest"
})

使用RabbitTemplate发布消息到RabbitMQ,然后消费者从RabbitMQ取出消息进行业务逻辑处理,我们简单的把取出来的消息放下一个List

	@Test
	public void test() throws Exception {
		String exchange = "mq-exchange-1";
		// 构造一条业务数据
		MqMessage mqMessage = buildMessage();
		// 发布业务数据到RabbitMQ
		rabbitTemplate.convertAndSend(exchange, "", jsonUtil.toJson(mqMessage));
		// sleep一下,等消费者消费数据
		ThreadUtils.sleep(DurationUtils.toDuration(1000, TimeUnit.MILLISECONDS));
		// 从消费者获取消费结果
		List<MqMessage> consumerResult = springRabbitConsumer.getMqMessageList();
		
		// 校验测试结果
		assertFalse(consumerResult.isEmpty());
		MqMessage lastConsume = consumerResult.get(consumerResult.size()-1);
		assertEquals(mqMessage.getMsgId(), lastConsume.getMsgId());
		
		log.debug("测试通过");
	}

小结

以上就是Spring Boot整合RabbitMQ最简单的一个例子,Spring Boot已经帮我们做了很多事情,我们只需要做很少的配置,使用注解很快就可以完成对RabbitMQ的发布以及订阅。

配置多个RabbitMQ

只连一个RabbitMQ的时候非常简单,但在实际使用中可能需要连两个甚至两个以上的RabbitMQ,那要如何进行配置呢?我们接下来继续学习。

想要配置多个RabbitMQ,我们就需要了解更多Spring Boot相关的原理,学习一下Spring Boot到底帮我们做了哪些事情。

了解RabbitAutoConfiguration

RabbitAutoConfiguration这个配置类,默默的帮我们创建了很多东西,列一些比较重要的Bean

  • CachingConnectionFactory
  • RabbitTemplate
  • AmqpAdmin
  • RabbitMessagingTemplate
  • SimpleRabbitListenerContainerFactoryConfigurer
  • SimpleRabbitListenerContainerFactory

这些Bean的创建,是有前提条件的,有些是@ConditionalOnMissingBean,有些是@ConditionalOnSingleCandidate……

为什么要了解RabbitAutoConfiguration这个类呢?因为配置多个RabbitMQ时会影响到Spring Boot的这些默认行为,同时我们也可以从这个类中学习如何构造需要的东西,这是进入高级应用的学习入口。

多个RabbitMQ配置

我们以两个RabbitMQ为例,一个以default命名,一个以second命名。

在配置文件中追加配置如下

spring.rabbitmq.second.host: 127.0.0.1
spring.rabbitmq.second.port: 5672
spring.rabbitmq.second.username: guest
spring.rabbitmq.second.password: guest

继续改造RabbitmqConfiguration.java

上面已经在RabbitmqConfiguration类中声明了队列、交换机以及绑定关系

如果要连多个RabbitMQ,就需要手动创建CachingConnectionFactoryRabbitTemplate

读取second配置

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitmqConfiguration {

	// springboot创建的RabbitProperties
	@Qualifier("spring.rabbitmq-org.springframework.boot.autoconfigure.amqp.RabbitProperties")
	private @Autowired RabbitProperties defaultRabbitProperties;

	// 第2个RabbitMQ的配置
	private RabbitProperties second;
	
	public void setSecond(RabbitProperties second) {
		this.second = second;
	}
}

创建队列、交换机、绑定

根据mqQueue1的创建方式,再创建一个mqQueue2

	@Bean
	public Queue mqQueue2() {
		String queueName = "mq-queue-2";
		return new Queue(queueName, true, false, false, new HashMap<>());
	}
	
	@Bean
	public Exchange mqExchage2() {
		String exchangeName = "mq-exchange-2";
        return new FanoutExchange(exchangeName, true, false, new HashMap<>());
    }
	
	@Bean
	public Binding bindQueue2AndExchange2(
			@Qualifier("mqQueue2") Queue mqQueue2,
			@Qualifier("mqExchage2") Exchange mqExchage2) {
		
        return BindingBuilder.bind(mqQueue2).to(mqExchage2).with("").noargs();
    }

创建ConnectionFactory

	@Bean
    public ConnectionFactory connectionFactory() {
        return createConnectionFactory(defaultRabbitProperties);
    }
	
	@Bean
    public ConnectionFactory secondConnectionFactory() {
        return createConnectionFactory(second);
    }

createConnectionFactory方法的代码如下

	private ConnectionFactory createConnectionFactory(RabbitProperties properties){
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(properties.getHost());
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        return connectionFactory;
    }

创建RabbitTemplate

	@Bean
	public RabbitTemplate defaultRabbitTemplate(@Qualifier("connectionFactory") ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		return rabbitTemplate;
	}
	
	@Bean
	public RabbitTemplate secondRabbitTemplate(@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
		return rabbitTemplate;
	}

编写Junit单元测试

Junit测试类上面增加配置属性

@TestPropertySource(properties = {
		""
		,"spring.rabbitmq.host: 127.0.0.1"
		,"spring.rabbitmq.port: 5672"
		,"spring.rabbitmq.username: guest"
		,"spring.rabbitmq.password: guest"
		
		,"spring.rabbitmq.second.host: 127.0.0.1"
		,"spring.rabbitmq.second.port: 5672"
		,"spring.rabbitmq.second.username: guest"
		,"spring.rabbitmq.second.password: guest"
})

在测试类中引用创建的Bean

	@Qualifier("defaultRabbitTemplate")
	private @Autowired RabbitTemplate rabbitTemplate;
	
	@Qualifier("secondRabbitTemplate")
	private @Autowired RabbitTemplate rabbitTemplateSecond;
	
	@Qualifier("springRabbitConsumer")
	private @Autowired SpringRabbitConsumer springRabbitConsumer;
	
	@Qualifier("springRabbitConsumerSecond")
	private @Autowired SpringRabbitConsumer springRabbitConsumerSecond;

开始写测试代码

	@Test
	public void test() throws Exception {
		String exchange = "mq-exchange-1";
		// 构造一条业务数据
		MqMessage mqMessage = buildMessage();
		// 发布业务数据到RabbitMQ
		rabbitTemplate.convertAndSend(exchange, "", jsonUtil.toJson(mqMessage));
		// sleep一下,等消费者消费数据
		ThreadUtils.sleep(DurationUtils.toDuration(500, TimeUnit.MILLISECONDS));
		// 从消费者获取消费结果
		List<MqMessage> consumerResult = springRabbitConsumer.getMqMessageList();
		
		// 校验测试结果
		assertFalse(consumerResult.isEmpty());
		MqMessage lastConsume = consumerResult.get(consumerResult.size()-1);
		assertEquals(mqMessage.getMsgId(), lastConsume.getMsgId());
		
		// =================================
		
		String exchange2 = "mq-exchange-2";
		// 构造一条业务数据
		MqMessage mqMessage2 = buildMessage();
		// 发布业务数据到RabbitMQ
		rabbitTemplateSecond.convertAndSend(exchange2, "", jsonUtil.toJson(mqMessage2));
		// sleep一下,等消费者消费数据
		ThreadUtils.sleep(DurationUtils.toDuration(500, TimeUnit.MILLISECONDS));
		// 从消费者获取消费结果
		List<MqMessage> consumerResult2 = springRabbitConsumerSecond.getMqMessageList();
		
		// 校验测试结果
		assertFalse(consumerResult2.isEmpty());
		MqMessage lastConsume2 = consumerResult2.get(consumerResult2.size()-1);
		assertEquals(mqMessage2.getMsgId(), lastConsume2.getMsgId());
		
		log.debug("测试通过");
	}

RabbitAnnotationDrivenConfiguration介绍

如果直接运行上面的Junit单元测试会发现有一个报错

Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'rabbitListenerContainerFactory' defined in class path resource [org/springframework/boot/autoconfigure/amqp/RabbitAnnotationDrivenConfiguration.class]: Unsatisfied dependency expressed through method 'simpleRabbitListenerContainerFactory' parameter 1; nested exception is org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.amqp.rabbit.connection.ConnectionFactory' available: expected single matching bean but found 2: defaultConnectionFactory,secondConnectionFactory  

我们根据报错信息看看RabbitAnnotationDrivenConfiguration这个类的源码,其中有一段创建Bean的代码如下

	@Bean(name = "rabbitListenerContainerFactory")
	@ConditionalOnMissingBean(name = "rabbitListenerContainerFactory")
	@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
			matchIfMissing = true)
	SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
			SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		return factory;
	}

这段代码的意思是如果没有创建SimpleRabbitListenerContainerFactory这个Bean,并且Bean的名字为rabbitListenerContainerFactory这个的话,那么就会创建一个名字为rabbitListenerContainerFactoryBean,在创建的时候会依赖一个ConnectionFactoryBean,但我们定义了两个ConnectionFactory,一个叫defaultConnectionFactory,一个叫secondConnectionFactory,所以报错了。

要解决这个问题,就回到RabbitmqConfiguration配置类,添加如下代码

	@Bean(name = "rabbitListenerContainerFactory")
	SimpleRabbitListenerContainerFactory defaultRabbitListenerContainerFactory(
			SimpleRabbitListenerContainerFactoryConfigurer configurer,
			@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		configurer.configure(factory, connectionFactory);
		return factory;
	}
	
	@Bean
	SimpleRabbitListenerContainerFactory secondRabbitListenerContainerFactory(
			SimpleRabbitListenerContainerFactoryConfigurer configurer,
			@Qualifier("secondConnectionFactory") ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		configurer.configure(factory, connectionFactory);
		return factory;
	}

这样我们手动创建两个SimpleRabbitListenerContainerFactoryBean,并且把defaultRabbitListenerContainerFactoryBean名字声明为rabbitListenerContainerFactory。这样一来,RabbitAnnotationDrivenConfiguration里面就不会再自己去创建SimpleRabbitListenerContainerFactory了。

小结

以上就是Spring Boot整合RabbitMQ连接多个RabbitMQ的例子,从代码上可以看出,与连接单个RabbitMQ相比,需要了解Spring Boot关于RabbitMQ的更多原理,也需要我们手工创建更多的Bean

Spring BootRabbitProperties类会帮我们读取一个RabbitMQ配置,我们自己完成对second配置的读取,如果需要连接第3个、第4个甚至更多RabbitMQ,只需要模仿second示例即可。

@RabbitListener如何监听指定的RabbitMQ

上面给出连1个RabbitMQ的例子和连多个RabbitMQ的例子,那么就有细心的朋友发现了,虽然在SpringRabbitConsumer类中可以使用@RabbitListener注解实现订阅功能,但如果配置了多个RabbitMQ,怎么知道订阅的是哪个RabbitMQ,可以按自己的意愿指定订阅的RabbitMQ吗?

答案是肯定的,我们继续学习@RabbitListener这个注解

@RabbitListener这个注解有一个属性containerFactory,这个属性正好是我们解决前面提到的UnsatisfiedDependencyException这个异常而手动创建的两个Bean,分别是rabbitListenerContainerFactorysecondRabbitListenerContainerFactory,这样就把@RabbitListenerRabbitMQ关联起来了

@Component
@RabbitListener(queues = "mq-queue-1", ackMode = SpringRabbitConsumer.ACK_MODE, containerFactory = "rabbitListenerContainerFactory")
@Slf4j
public class SpringRabbitConsumer {

}

@Component
@RabbitListener(queues = "mq-queue-2", ackMode = SpringRabbitConsumerSecond.ACK_MODE, containerFactory = "secondRabbitListenerContainerFactory")
@Slf4j
public class SpringRabbitConsumerSecond extends SpringRabbitConsumer {
	
}

我们通过RabbitMQ控制台看看修改后的效果

RabbitMQ两个不同连接

两个消费者分别由两个不同的ContainerFactory管理,ContainerFactory对应的就是连接的RabbitMQ

总结

以上就是在Srping Boot框架中使用RabbitMQ的多数场景的例子,后面还会写更多关于RabbitMQ高级应用,欢迎留言提问,一起交流,一起学习。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-12 16:40:01  更:2021-08-12 16:42:34 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:54:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码