本文主要分享RabbitMQ exchange类型的功能和使用、RabbitMQ延时队列、一个springboot服务发送消息到多虚拟主机
1.RabbitMQ exchange
exchange交换机,负责分发消息,为解决消息不同的业务场景,也提供了不同的交换机类型。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
addresses: 192.168.0.221:5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual
2.DirectExchange
默认交换机,一对一,默认下根据队列名下发,有routerKey时根据key下发
2.1 java中实现
@Bean
public boolean createDirectQueue(@Autowired ConnectionFactory connectionFactory) {
String exchange = "direct_exchange";
String queue = "direct_queue";
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareExchange(new DirectExchange(exchange));
rabbitAdmin.declareQueue(new Queue(queue));
rabbitAdmin.declareBinding(new Binding(queue, Binding.DestinationType.QUEUE,
exchange, "", null));
return true;
}
@GetMapping("/send")
public String send() {
JSONObject object = new JSONObject();
object.put("hello", "22222");
rabbitTemplate.convertAndSend("direct_exchange", "", object.toJSONString());
return "success";
}
@RabbitHandler
@RabbitListener(queues = {"direct_queue"})
public void onMessage(Message message, Channel channel) throws Exception {
String msgBodyString = new String(message.getBody());
JSONObject json = JSONObject.parseObject(msgBodyString);
log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2.2 测试日志
调用接口发送消息到RabbitMQ,查看控制台日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},direct_queue
消费确认后就会被删除,只能消费一次
3.FanoutExchange
广播消息,所有队列都发,所有队列消费完毕后删除,如无消费者会保存在队列(开启持久化)等待一个消费者消费后删除
3.1 java中实现
@Bean
public boolean createFanoutQueue(@Autowired ConnectionFactory connectionFactory) {
String exchange = "fanout_exchange";
String queue = "fanout_queue";
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareExchange(new FanoutExchange(exchange));
for (int i = 0; i < 2; i++) {
rabbitAdmin.declareQueue(new Queue(queue + i));
rabbitAdmin.declareBinding(new Binding(queue + i, Binding.DestinationType.QUEUE,
exchange, "", null));
}
return true;
}
@GetMapping("/send")
public String send() {
JSONObject object = new JSONObject();
object.put("hello", "22222");
rabbitTemplate.convertAndSend("fanout_exchange", "", object.toJSONString());
return "success";
}
@RabbitHandler
@RabbitListener(queues = {"fanout_queue0", "fanout_queue1"})
public void fonoutOnMessage(Message message, Channel channel) throws Exception {
String msgBodyString = new String(message.getBody());
JSONObject json = JSONObject.parseObject(msgBodyString);
log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
3.2 测试日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},fanout_queue1
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},fanout_queue0
一条消息会广播到两个队列
4.TopicExchange
通配符模糊匹配routerKey,满足条件就发送
4.1 java中实现
@Bean
public boolean createTopicQueue(@Autowired ConnectionFactory connectionFactory) {
String exchange = "topic_exchange";
String queue = "topic_queue";
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareExchange(new TopicExchange(exchange));
rabbitAdmin.declareQueue(new Queue(queue + "_before"));
rabbitAdmin.declareBinding(new Binding(queue + "_before", Binding.DestinationType.QUEUE,
exchange, "*." + queue, null));
rabbitAdmin.declareQueue(new Queue(queue + "_after"));
rabbitAdmin.declareBinding(new Binding(queue + "_after", Binding.DestinationType.QUEUE,
exchange, queue + ".*", null));
return true;
}
rabbitTemplate.convertAndSend("topic_exchange", "video.topic_queue", object.toJSONString());
rabbitTemplate.convertAndSend("topic_exchange", "topic_queue.music", object.toJSONString());
4.2 测试日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},topic_queue_before
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"22222"},topic_queue_after
经测试,均能正常消费
5.HeadersExchange
可以配置多个key-value形式的密钥,生产者在头部配置任意一个key-value即可发送到对应的队列
5.1 java中实现
@Bean
public boolean createHeaderQueue(@Autowired ConnectionFactory connectionFactory) {
String exchange = "header_exchange";
String queue = "header_queue";
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareExchange(new HeadersExchange(exchange));
Map<String, Object> map = new HashMap<>();
map.put("header_key1", "12345");
map.put("header_key2", "123456");
rabbitAdmin.declareQueue(new Queue(queue));
rabbitAdmin.declareBinding(
BindingBuilder.bind(new Queue(queue))
.to(new HeadersExchange(exchange))
.whereAny(map).match());
return true;
}
object.put("hello", "header_key1");
rabbitTemplate.convertAndSend("header_exchange", null,
MessageBuilder.withBody(object.toJSONString().getBytes())
.setHeader("header_key1", "12345").build());
object.put("hello", "header_key2");
rabbitTemplate.convertAndSend("header_exchange", null,
MessageBuilder.withBody(object.toJSONString().getBytes())
.setHeader("header_key2", "123456").build());
5.2 测试日志
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"header_key1"},header_queue
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"header_key2"},header_queue
能正常消费到发送的内容
6.RabbitMQ延时队列
6.1 延时队列原理说明
RabbitMQ提供消息过期队列,一条消息在指定时候后没有被消费掉则会被定义为过期,过期的消息可以通过配置转到其他的交换机去,如不配置则直接抛掉。 那么开发实现过程为:
-
新建delayed_exchange_ttl 和delayed_queue_ttl 队列,通过配置设置消息存活时间和过期以后存放的死信队列。发送需要延迟的消息到该交换机,该交换机下的队列无消费者。 -
成为死信一般有以下几种情况: 消息被拒绝(basic.reject or basic.nack)且带requeue=false参数 消息的TTL-存活时间已经过期 队列长度限制被超越(队列满) -
新建delayed_exchange 和delayed_queue 队列,存放已过期消息的死信队列,延时消息的消费者监听该队列
6.2 Java中实现
@Bean
public boolean createDelayedQueue(@Autowired ConnectionFactory connectionFactory) {
String exchange = "delayed_exchange";
String queue = "delayed_queue";
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.declareExchange(new DirectExchange(exchange + "_ttl"));
Map<String, Object> map = new HashMap<>();
map.put("x-message-ttl", 30000);
map.put("x-dead-letter-exchange", "delayed_exchange");
map.put("x-dead-letter-routing-key", "delayed_exchange_key");
rabbitAdmin.declareQueue(new Queue(queue + "_ttl", true, false, true, map));
rabbitAdmin.declareBinding(new Binding(queue + "_ttl", Binding.DestinationType.QUEUE,
exchange + "_ttl", "", null));
rabbitAdmin.declareExchange(new DirectExchange(exchange));
rabbitAdmin.declareQueue(new Queue(queue));
rabbitAdmin.declareBinding(new Binding(queue, Binding.DestinationType.QUEUE,
exchange, "delayed_exchange_key", null));
return true;
}
log.info("发送消息:{}",new Date().getTime());
rabbitTemplate.convertAndSend("delayed_exchange_ttl", null, object.toJSONString());
- 最终队列的图如下,ttl队列中的消息达到指定时间后会转存到
delayed_queue 队列去 - 消费者代码
@RabbitHandler
@RabbitListener(queues = {"delayed_queue"})
public void onMessage(Message message, Channel channel) throws Exception {
String msgBodyString = new String(message.getBody());
JSONObject json = JSONObject.parseObject(msgBodyString);
log.info("消费消息:{},{},{}", json,
message.getMessageProperties().getConsumerQueue(),new Date().getTime());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
6.3 测试日志
c.e.rabbitmq.controller.Controller : 发送消息:1655967316743
c.e.rabbitmq.consum.RabbitmqConsume : 消费消息:{"hello":"1234567"},delayed_queue,1655967344564
总体来说是实现了延时功能,但是延时的时间有误差,只适合不是很重要的场景使用
需要注意的一个点: 队列一经创建,不可再次修改。 至于延时时间动态的,可以先注解@Autowired ConnectionFactory connectionFactory ,然后再通过生产者声明创建指定时间的队列。
7.多虚拟主机实现
使用场景:业务服务需要发送消息到用户和订单服务,但是这两个服务又是不一样的虚拟主机下。这种情况就只能通过代码进行连接了。
7.1 生产者
配置我就省略了,反正不用自动装配就行
@Bean("orderPushRabbit")
public RabbitTemplate orderPushRabbit() {
CachingConnectionFactory rabbitFactory = new CachingConnectionFactory();
rabbitFactory.setAddresses(amqpAddress);
rabbitFactory.setChannelCacheSize(Runtime.getRuntime().availableProcessors() * 2);
rabbitFactory.setUsername(amqpUserName);
rabbitFactory.setPassword(amqpPassword);
rabbitFactory.setVirtualHost(vhost);
RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitFactory);
MessageConverter serializerMessageConverter = new SerializerMessageConverter();
rabbitTemplate.setMessageConverter(serializerMessageConverter);
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(routingKey);
return rabbitTemplate;
}
如果有多个虚拟主机的复制上述配置即可,需要注意配置的不同即可。 生产者代码中使用
@Autowired
@Qualifier("orderPushRabbit")
RabbitTemplate PushRabbit;
@Autowired
@Qualifier("userPushRabbit")
RabbitTemplate userPushRabbit;
7.2 消费者
根据虚拟主机声明不同的连接工厂
@Bean(name = "orderConnectionFactory")
public ConnectionFactory orderConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(amqpAddress);
connectionFactory.setUsername(amqpUserName);
connectionFactory.setPassword(amqpPassword);
connectionFactory.setVirtualHost(vhost);
return connectionFactory;
}
@Bean("orderListenerContainer")
@Primary
public RabbitListenerContainerFactory orderListenerContainer(@Qualifier("orderConnectionFactory") ConnectionFactory orderConnectionFactory) {
SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
container.setConnectionFactory(orderConnectionFactory);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setPrefetchCount(prefetch);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageConverter(new SimpleMessageConverter());
return container;
}
以此类推,如果有多个虚拟主机,则有多份上述的配置。 @Primary 注解标识在没有声明监听工厂时默认使用的
消费者代码
@RabbitListener(queues = {"order_exchange"}, containerFactory = "orderListenerContainer")
public void onMessage(Message message, Channel channel) throws Exception {
String msgBodyString = new String(message.getBody());
JSONObject json = JSONObject.parseObject(msgBodyString);
log.info("消费消息:{}", json);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
8.总结
上面有说明死信的条件,如没有死信队列,可能会导致死循环。 消息确认不通过或者异常会重新进入到队列头部,接下来消费者又会消费此条消息由此造成死循环。 因为消费者最好遵循以下几点: 1.捕获消息处理过程中的异常,一定要自己手动确认还是不确认(由其他消费者消费)
2.重试次数记录,如果规范消息协议应该是有次数字段的,消费者根据重试次数来做异常消息日志等操作
3.根据配置自动丢到死信队列,前提是创建队列时需要指定死信队列和key
listener:
simple:
concurrency: 5
prefetch: 10
retry:
enabled: true
max-attempts: 3
以上就是本章的全部内容了。
上一篇:RabbitMQ第一话 – docker安装RabbitMQ以及Springboot集成RabbitMQ 下一篇:RabbitMQ第三话 – RabbitMQ高可用集群搭建
贵有恒何必三更眠五更起,最无益只怕一日曝十日寒
|