想要了解springboot中怎么使用RabbitMQ,关键是知道springboot提供的RabbitAutoConfiguration自动配置类中配置了哪些RabbitMQ的组件到spring容器中。查看发现其中自动配置了ConnectionFactory,RabbitTemplate和AmqpAdmin这三个关键组件。
最简单的配置文件application.properties
#rabbitmq配置
spring.rabbitmq.host=192.168.228.138
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=test
springboot会根据配置文件的信息配置ConnectionFactory组件,此时如果已经配置了交换机和队列的绑定关系,就已经能够自动注入RabbitTemplate组件并使用。
AmqpAdmin管理员
从RabbitAutoConfiguration自动配置类中可以知道,容器中的AmqpAdmin其实是一个RabbitAdmin,负责声明RabbitMQ中的交换机,队列以及绑定关系。可以自己利用它来进行声明,也可以让它自动帮我们声明。
第一种方式,用它来进行声明,因为容器中已经存在,那么直接使用就行了
@Configuration
public class RabbitConfig {
@Autowired
public void rabbitAdmin(AmqpAdmin amqpAdmin){
DirectExchange directExchange = new DirectExchange("myDirectExchange", true, false);
Queue jinjinQueue = new Queue("jinjinQueue", true, false, false);
Queue heiheiQueue = new Queue("heiheiQueue", true, false, false);
Binding binding1 = BindingBuilder.bind(jinjinQueue).to(directExchange).with("jinjin");
Binding binding2 = BindingBuilder.bind(heiheiQueue).to(directExchange).with("heihei");
amqpAdmin.declareExchange(directExchange);
amqpAdmin.declareQueue(jinjinQueue);
amqpAdmin.declareQueue(heiheiQueue);
amqpAdmin.declareBinding(binding1);
amqpAdmin.declareBinding(binding2);
}
}
第二种方式,让它自动帮我们声明
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("myDirectExchange",true,false);
}
@Bean
public Queue jinjinQueue(){
return new Queue("jinjinQueue",true,false,false);
}
@Bean
public Queue heiheiQueue(){
return new Queue("heiheiQueue",true,false,false);
}
@Bean
public Binding binding1(){
return BindingBuilder.bind(jinjinQueue()).to(directExchange()).with("jinjin");
}
@Bean
public Binding binding2(){
return BindingBuilder.bind(heiheiQueue()).to(directExchange()).with("heihei");
}
}
这样方式和第一种方式最大的区别就是没有使用amqpAdmin去声明exchange、Queue和Binding,而是直接将这些对象放置到容器中。 因为容器中有这些对象,因此amqpAdmin可以在容器启动过程中从容器中获取到这些对象并帮我们进行声明 amqpAdmin只是一个接口,真正帮我们声明的类其实是它的实现类RabbitAdmin
@ManagedResource(description = "Admin Tasks")
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
BeanNameAware, InitializingBean {
......
}
可以看到,RabbitAdmin实现了AmqpAdmin接口,同时还实现了ApplicationContextAware接口,代表它能获取到spring容器,还实现了InitializingBean接口,这个接口提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。 声明交换机,队列和绑定关系这些工作就是在afterPropertiesSet方法中通过容器来获取交换机、队列、绑定关系这些对象,然后帮我们自动进行声明。
RabbitMQ最重要的就是生产者和消费者的编写了
生产者
RabbitTemplate就是一个生产者,可以利用它来发送消息。并且springboot已经帮我们配置到容器中了,如果不使用RabbitMQ的高级特性,就能直接注入它来使用。
其中最常用的方法是send和convertAndSend方法 它们的区别是send方法需要自己创建message对象,如果不需要多余参数,使用convertAndSend会更好,它会帮我们将传进去的消息封装成Message对象,结果是一样的。
@Test
public void test() {
String msg = "Hello jinjin!";
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("1500");
Message message = new Message(msg.getBytes(),messageProperties);
rabbitTemplate.send("myDirectExchange","jinjin",message);
rabbitTemplate.convertAndSend(交换机名称:"myDirectExchange",路由key:"jinjin",要发送的消息: "Hello jinjin!");
}
convertAndSend方法内部
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
}
this.convertMessageIfNecessary(object)能将我们传进去的消息转换成Message,这样就和send方法一样了,最终底层还是使用ConnectionFactory创建的channel的basciPublish方法为我们发送消息。
消费者
相比于生产者,消费者比较灵活,可以使用AbstractMessageListenerContainer的子类对象监听队列,也可以使用@RabbitListener注解来进行监听队列。
第一种方式,AbstractMessageListenerContainer方式 这个抽象类一共有三种实现,这里使用SimpleMessageListenerContainer类来举例,由于有三个实现类,springboot不知道我们要使用哪一个实现类,因此需要我们自己使用配置类去配置,然后放到spring容器中。
@Bean
public SimpleMessageListenerContainer reviceMessage(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueueNames("jinjinQueue","heiheiQueue");
simpleMessageListenerContainer.setConcurrentConsumers(3);
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
simpleMessageListenerContainer.setPrefetchCount(5);
simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
System.out.println("如果监听多条队列,哪一条队列过来的信息,可以用switch做进一步处理"+message.getMessageProperties().getConsumerQueue());
System.out.println(new String(message.getBody()));
System.out.println(message.getMessageProperties());
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
});
return simpleMessageListenerContainer;
}
simpleMessageListenerContainer可以设置监听的参数,其中setMessageListener方法是设定一个MessageListener参数的对象,最终调用这个对象的onMessage方法来消费消息。不过消息还是由channel.basicConsume方法进行接收的。
但配置信息写在代码中就不灵活了,因此可以在springboot的配置文件中配置信息,然后再注入这个SimpleMessageListenerContainer
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=2
因为我们使用的是SimpleMessageListenerContainer对象,因此配置文件要用simple的然后再配置文件中注入配置信息就可以了。
@Bean
@ConfigurationProperties("spring.rabbitmq.listener.simple")
public SimpleMessageListenerContainer reviceMessage(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueueNames("jinjinQueue","heiheiQueue");
simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
});
return simpleMessageListenerContainer;
}
使用@ConfigurationProperties(“spring.rabbitmq.listener.simple”)就简洁多了。
第二种方式,@RabbitListener注解方式 @RabbitListener注解方式同样也是需要像第一种方式一样配置一下监听参数的,可以给容器注入一个RabbitListenerContainerFactory类对象进行配置。
@Bean
public RabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setPrefetchCount(3);
return factory;
}
配置完成后只需要编写一个监听类就可以了
@Component
@RabbitListener(queues = "jinjinQueue")
public class QueueConsumer {
@RabbitHandler
public void reviceMessage(@Payload Message message,Channel channel,byte[] msg){
try {
System.out.println("rabbitMQ已经接收到信息: " + msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
} catch (IOException ioException) {
ioException.printStackTrace();
}
e.printStackTrace();
}
}
}
@RabbitListener(queues = “jinjinQueue”)指定监听哪一条队列,@RabbitHandler指定消息处理的方法,会自动传@Payload Message message,Channel channel,byte[] msg这三个参数给我们使用,其中msg的类型为我们发送的消息的类型。前面两个参数不需要可以不写。
这样,ConnectionFactory,RabbitTemplate和AmqpAdmin的使用方法都介绍完了,如果不利用RabbitMQ高级特性,已经可以很好的使用它们来处理业务了。需要注意的是,ConnectionFactory,RabbitTemplate和AmqpAdmin都是可以自定义的,也就是使用自己创建的对象,不使用springboot提供的对象,不过配置文件会失效。
RabbitMQ的高级特性配置方式
但如果想要更好的使用RabbitMQ,肯定是要使用它的高级特性的,包括发送端确认机制,消息返回机制,消费端确认机制,消费端限流机制以及消息过期机制。 要使用这些机制是需要进行配置的,这里介绍它们的配置方式,这样如果已经会理论的话就能很好的使用了。
第一个机制,发送端确认机制 一旦消息发送到RabbitMQ服务器中,就会触发这个机制 首先在配置文件中开启这个机制
#生产端打开消息确认机制
spring.rabbitmq.publisher-confirm-type=correlated
然后在RabbitTemplate中编写处理代码,由于RabbitTemplate已经存在容器中,可以直接增强它
@Configuration
public class RabbitConfig {
@Autowired
public void enhanceRabbitTemplate(RabbitTemplate rabbitTemplate){
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
});
}
}
这样就成功开启消息确认机制了
第二个机制,消息返回机制 当RabbitTemplate发送消息到RabbitMQ服务器中后,如果找不到这个交换机,或者找到交换机,但是找不到队列,就会触发消息返回机制。 首先在配置文件中开启这个机制
#开启生产端消息返回机制
spring.rabbitmq.publisher-returns=true
和消息确认机制一样,在RabbitTemplate中编写处理代码
@Configuration
public class RabbitConfig {
@Autowired
public void enhanceRabbitTemplate(RabbitTemplate rabbitTemplate){
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returned -> {
});
}
}
第三个机制,消费端确认机制 前提条件,将确认方式改为手动确认,默认是自动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
开启手动确认后,消费端需要手动确认,在手动确认之前,消息将会是Unacked状态,在项目重新启动或RabbitMQ服务器重启后,消息将会变成Ready的状态。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
第四个机制,消费端限流机制 需要配合消费端确认机制一起使用,开启消费端手动确认 开启方式
spring.rabbitmq.listener.direct.prefetch=5
simpleMessageListenerContainer.setPrefetchCount(5);
factory.setPrefetchCount(5);
如果队列的Unacked值大于设定值,则消息暂存队列中,等待消费者消费完消息确认后再消费下一条消息
第五个机制,消息过期机制 可以为队列或者某一个消息设置过期时间,如果队列中的消息过期了还未被处理,则消息会被丢弃,造成数据丢失,因此通常配合死信队列一起使用。 在创建队列的时候配置过期时间
创建死信交换机和队列
@Configuration
public class DeadConfig {
@Bean
public DirectExchange DeadDirectExchange(){
return new DirectExchange("Dead_direct_exchange",true,false);
}
@Bean
public Queue DeadDirectQueue(){
return new Queue("DeadDirectQueue",true,false,false);
}
@Bean
public Binding DeadDirectBinding1(){
return BindingBuilder.bind(DeadDirectQueue()).to(DeadDirectExchange()).with("dead");
}
}
将消息过期的队列设置上死信队列的交换机以及路由key即可
@Configuration
public class TTLConfig {
@Bean
public DirectExchange TTLDirectExchange(){
return new DirectExchange("ttl_direct_exchange",true,false);
}
@Bean
public Queue TTLDirectQueue(){
Map<String,Object> map = new HashMap<>();
map.put("x-message-ttl",5000);
map.put("x-dead-letter-exchange","Dead_direct_exchange");
map.put("x-dead-letter-routing-key","dead");
return new Queue("TTLDirectQueue",true,false,false,map);
}
@Bean
public Binding ttlDirectBinding1(){
return BindingBuilder.bind(TTLDirectQueue()).to(TTLDirectExchange()).with("ttl");
}
}
之后实现监听就可以了。
|