前言
SpringBoot的集成RabbitMQ很简单,引入starer,简单配置几个属性就能开始使用,本人在平时工作使用过程中也遇到过一些坑,所以这篇文章主要罗列一些个人的使用建议。如有不对的地方或者好的建议,欢迎指正
一、SpringBoot集成RabbitMQ
1.引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置属性
在application.properties添加配置
spring.rabbitmq.addresses=127.0.0.1:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
3.代码示例
@RestController
public class RabbitController {
@Autowired
RabbitTemplate rabbitTemplate;
AtomicInteger next = new AtomicInteger();
@GetMapping("/send")
public void send(@RequestParam(value = "routingkey", required = false, defaultValue = "test") String routingKey) {
Rabbit rabbit = new Rabbit();
rabbit.setId(next.getAndIncrement());
rabbit.setName(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("test", routingKey, JSONObject.toJSONString(rabbit));
}
@RabbitListener(queues = "test")
public void handler(Message message) {
System.out.println("收到消息:" + new String(message.getBody()));
}
public static class Rabbit implements Serializable {
private static final long serialVersionUID = -1L;
private Integer id;
private String name;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
- 项目运行起来后,请求/send接口就会往名为test的交换器发送一个Rabbit对象,routingkey为test的,
- RabbitMQ服务收到后就会路由到相应绑定的队列
- @RabbitListener注解会开启消费者线程,接收名为test的队列发送过来的消息
二、使用建议
1.发送方消息序列化器选择
1.1 使用默认SimpleMessageConverter
通常发送方通过rabbitTemplate.convertAndSend()来发送消息,默认的序列化器是SimpleMessageConverter,为了方便调试、格式统一,都用json格式,所以发送的时候要手动转换成json。
rabbitTemplate.convertAndSend("test", routingKey, JSONObject.toJSONString(rabbit));
1.2 使用Jackson2JsonMessageConverter
如果不想每次手动把对象转成json,可以使用Jackson2JsonMessageConverter,这时候需要配置一下rabbitTemplate
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
rabbitTemplate.convertAndSend("test", routingKey, rabbit);
使用Jackson2JsonMessageConverter后,不要把对象转成json字符串后发送,例如:rabbitTemplate.convertAndSend(exchange, routingKey, JSONObject.toJSONString(rabbit)),这样Jackson2JsonMessageConverter会对字符串再做一次json序列化,会得到带转义字符的字符串,如下图 所以这里就看到了使用Jackson2JsonMessageConverter的一个小坑,如果想发送一个字符串,就不能用convertAndSend方法了,要用send方法。
1.3 小结
从上面分析来看,使用SimpleMessageConverter会更通用一些,容错性更好,虽然每次发送对象的时候需要手动转json,但挺符合人的惯性思维。当然,已经在运行的线上程序就不要轻易改动了,又不是不能用!(手动狗头)
2.消费方序列化器选择
2.1 建议使用默认SimpleMessageConverter
使用Message来接收,可以拿到消息所有的信息(如消息头、body等),然后再进一步转成字符串或者对象。
@RabbitListener(queues = "test")
public void handler(Message message) {
System.out.println("收到消息:" + new String(message.getBody()));
}
PS:消费方使用SimpleMessageConverter,就不要用对象来接受信息,因为不能确保发送方也是用SimpleMessageConverter
2.2 为什么不建议使用Jackson2JsonMessageConverter
@RabbitListener(queues = "test")
public void handler(Rabbit rabbit) {
System.out.println("收到消息:" + rabbit);
}
@RabbitListener(queues = "test")
public void handler(Message message) {
System.out.println("收到消息:" + new String(message.getBody()));
}
如上述代码示例,
- 使用第一种,首先这样子拿不到完整消息内容,不利于排查问题。 如果别人发的是字符串,消息头没有content-type或者不是application/json,就会报错;也可能别人发的消息头content-type是application/json,而消息内容并不是Rabbit类,虽然不报错,但字段可能会对应不上,出现些奇怪的问题。
- 使用第二种,如果消息头没有content-type或者不是application/json,会打印一个告警信息Could not convert incoming message with content-type [xxx], ‘json’ keyword missing。还有个更坑的是,如果别人发的消息头中__type_id__ 字段,且是一个自己项目中不存在的类,那会报类找不到的错误。
2.3 小结
消费方使用默认SimpleMessageConverter就好,且以Message作为参数来接收。
PS 这时候不要给spring容器注入一个Jackson2JsonMessageConverter实例,否则消费者就会从spring容器里拿它来用,而不是用默认的SimpleMessageConverter
3.发送方的和消费方使用不同的connection
如图所示,客户端和RabbitMQ是保持着一个长连接connection,然后通过channel通信,一个connection默认最大可以支持2047个channel。所以当发送者和消费者共用一个connection,消费者又占用了过多的channel,发送者可能会无法创建或获取到空闲的channel来发送消息。
通过下面一句代码就可以实现
rabbitTemplate.setUsePublisherConnection(true);
4.避免开过多线程同时发送消息
前面说到一个connection的channel数量是有上限的,spring帮我们实现了一个channel缓存池,以达到channel复用的效果。假如一时间有3000个线程同时调rabbitTemplate.convertAndSend()方法,channel缓存池会瞬间被掏空,从缓冲池拿不到就会创建新的channel,3000已经超过2047了,就会报一个channelMax的错误,如果没有失败重试机制,消息就丢失了。 如果线上出现channelMax错误,去改RabbitMQ的配置要重启是不太现实的,可以通过下面代码暂时缓解,后面再优化代码,避免这种不好的做法
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
connectionFactory.setChannelCacheSize(100);
connectionFactory.setChannelCheckoutTimeout(1000);
return rabbitTemplate;
}
通过setChannelCheckoutTimeout这个方法,可以激活spring的channel缓存池的限流功能(通过信号量实现),一般还要用setChannelCacheSize设置一个合理的channel数量(默认是25)
5.自行实现发送失败重试机制
5.1 spring的重试机制
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.multiplier=1
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-interval=10000ms
虽然spring提供了RetryTemplate的重试机制,配置起来也简单,但它的重试会阻塞当前线程,我们使用MQ的地方一般都是流量大需要削峰的场景,如果出现发送失败的情况,这种重试的形式性能会很差,而且重试的数据就在内存中,如果服务宕机,消息就丢失了。
5.2 自行实现
可以通过setConfirmCallback()获取是否发送到exchange的回调结果,而setMandatory(true)、setReturnCallback()则是可以拿到exchange无法路由到任意队列的消息。
rabbitTemplate.setConfirmCallback();
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback();
大致思路是:
- 发送消息前,先把消息暂存起来(如数据库、redis等),然后发消息到MQ;
- 如果回调结果是成功,则把之前暂存的消息删掉,失败的话不用处理;
- 另外再起一个重试线程,定时重试之前暂存起来的消息,因为回调成功消息会删掉,所以重试的消息是没收到回调或者回调失败的;
6.多数据源时,指定具体数据源
为了方便,我们常常会在代码里配置queue、exchange、binding,如果我们连了多个MQ,如果不指定在那个MQ上创建,就会在每个MQ服务器上都创建一份queue、exchange、binding。
@Bean("rabbitAdmin1")
public RabbitAdmin rabbitAdmin1(@Qualifier("conn2")CachingConnectionFactory c){
return new RabbitAdmin(c);
}
@Bean("rabbitAdmin2")
public RabbitAdmin rabbitAdmin2(@Qualifier("conn2") CachingConnectionFactory c){
return new RabbitAdmin(c);
}
@Bean
public Exchange exchange(){
return ExchangeBuilder.fanoutExchange("xx").admins("rabbitAdmin1").build();
}
@Bean
public Queue queue(){
Queue queue = QueueBuilder.durable("xxx").build();
queue.setAdminsThatShouldDeclare("rabbitAdmin1");
return queue;
}
@Bean
public Binding binding(Exchange exchange, Queue queue){
Binding binding = BindingBuilder.bind(queue).to(exchange).with("xxx").noargs();
binding.setAdminsThatShouldDeclare("rabbitAdmin1");
return binding;
}
如上述代码,admins()、setAdminsThatShouldDeclare()方法指定了在哪个数据源上创建、绑定。
我们可能经常要订阅别的系统的消息,别人提供exchange,我们自行创建queue与其绑定。之前遇到的一个坑是,代码里没有指定在对方的MQ服务器进行绑定,那所有的MQ服务器都会进行尝试绑定,有的MQ服务器不存在同名的exchange的话就会报错了。
7.择机使用懒惰队列
创建队列的时候,如果带上x-queue-mode=lazy的参数,那么RabbitMQ就会把收到的消息存入磁盘,需要用的时候再加载到内存。适用于消息堆积量大,但消费速度又很慢的的场景,这样做可以降低MQ服务器的内存使用量,缺点就是损失一些消费性能。 代码示例如下
@Bean
public Queue queue(){
Queue queue = QueueBuilder.durable("xxx").withArgument("x-queue-mode","lazy").build();
queue.setAdminsThatShouldDeclare("rabbitAdmin1");
return queue;
}
8.消费者配置建议
8.1 线程数配置
@RabbitListener(queues = "test", concurrency = "3")
public void handler(Message message) {
System.out.println("收到消息:" + new String(message.getBody()));
}
spring会根据concurrency的值创建相应数量的线程,首先我们都知道线程数并不是越多越好,线程切换有消耗,另外其底层是每个线程会占用一个channel,做长轮询操作,如果设置过多线程就浪费资源了。总之,要根据实际业务场景考虑。
8.2 重试配置
8.2.1 使用spring的重试机制
如果消费者收到消息后,都是在同一个线程里处理事情的话,对吞吐率要求不高的话,个人建议用spring提供的重试机制就可以了,配置也很简单,如下:
spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.max-attempts=3
spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
acknowledge-mode=AUTO这个配置可以保证我们处理消息完成后,spring才会向RabbitMQ发送ack,RabbitMQ才会把这条消息删除;而如果重试多次后还是处理异常,spring也会向RabbitMQ发送nack,我们可以通过设置死信队列来保证消息不丢失。
PS:看RabbitMQ的客户端SDK发现个spring不同的地方,就是消费者的autoAck参数,如果是true,是不用回ack的意思,即MQ服务器发送消息给客户端就删掉消息了;如果是false,则客户端要回复ack后,MQ服务器才会删掉消息。 String basicConsume(String queue, boolean autoAck, Consumer callback)
8.2.2 自行重试
使用spring提供的重试机制,是会阻塞线程,直到成功或者超过重试次数,线程才能干其他活,吞吐率低,可以自行实现重试
spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
spring.rabbitmq.listener.simple.retry.enabled=false
配置上acknowledge-mode=AUTO同时禁用消费者的重试机制,大致思路是,先把处理失败的消息暂存起来,过一段时间后再进行重试,下面列举一些方案: 1) 如果MQ服务器支持rabbitmq_delayed_message_exchange插件,可以用它来发送延时消息。
@Bean
public Exchange exchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
final CustomExchange customExchange = new CustomExchange("xxx", "x-delayed-message", true, false, args);
return customExchange;
}
rabbitTemplate.convertAndSend("exchange", "routingkey", msg, msg -> {
msg.getMessageProperties().setDelay((int) TimeUnit.MINUTES.toMillis(1));
msg.getMessageProperties().setPriority(10);
return msg;
});
上面的代码首先是在创建Exchange的时候需要定义type为x-delayed-message,其次发送消息的时候要在属性上加上x-delay=xx。这样做,失败的消息会延时重新投递回我们的队列,达到重试的效果。当然我们也不能无限重试,所以要在消息上记录重试次数,超过次数就不再重试了。
2)利用死信队列机制。可以把失败的消息投递到另一个队列,改队列的死信队列指定为我们原先的队列,同时消息设置一个过期时间,等时间到了,RabbitMQ就自动帮我们把消息重新投递回原先的队列。
3)失败的消息记录在数据库、redis等,定时任务重试。
个人比较喜欢用第一种,实现起来简单。
|