欢迎并感谢浏览卢小龙的本篇文章
你好!亲爱的读者,本篇文章将主要记录本人有次在操作一个系统时,对RabbitMQ消费方式不了解导致的问题及解决,方便自己的学习历程记录以及复习参考,若有言错之处,请各位给予指点.
问题描述
当时业务需求时需要将用户认证的信息与活动号进行关系绑定,笔者这边的活动系统需要对统一认证系统的mq消息进行消费
@Slf4j
@Component
@RabbitListener(queue = RabbitTopicConfig.AUTH_ACTIVITY_QUEUE_NAME)
public class ActivityAttentionConsumer{
@Resource
private IMaActAttentionService actAttentionService;
@Log(value = "绑定信息MQ >> 活动信息与用户关联")
@RabbitHandler
public void registerBindingActivity(@Payload byte[] param){
Map<String, Object> json = deserilizableForMapFromFile(new String(param), String.class);
log.info("---->>>接收参数:{}", json);
this.validParams(json);
actAttentionService.saveUserToActRelation(json);
}
}
如上所示,是最正常的mq消费端的一段代码,但是消费后进入到RabbitMQ页面发现队列中该消息还存在,未被消费
问题排查
其实该问题主要由于笔者之前从未接触过手动提交消息的情况,在这之前都是在配置文件中对mq的信息自动提交消费。
于是笔者去Apollo配置中心查看了该系统对RabbitMQ的配置,果然发现了一个手动提交的配置
rabbitmq:
listener:
simple:
acknowledge-mode: manual #manual采用手动应答方式,none不确认,auto自动确认(默认方式)
concurrency: 1 #最小消费者数量
max-concurrency: 1 #最大消费者数量
retry:
enabled: true #是否支持重试
当开启该配置后,RabbitMQ消费端对消息消费后若不提交,则MQ中心不会接收到对该消息的消费动作,所以该消息会一直存在队列中,需要进行手动提交!
问题解决
这个问题的解决途径无非就两种:
在笔者搜寻信息后了解到,自动提交虽然会比较便捷,但是会存在消息丢失的风险,为了避免消息丢失,要在消息业务完毕后再手动确认消息消费。
代码修改如下:
@Log(value = "绑定信息MQ >> 活动信息与用户关联")
@RabbitHandler
public void registerBindingActivity(@Payload byte[] param, @Headers Map<String, Object> headers, Channel channel) throws IOException{
try{
long tag = (long)headers.get(AmqpHeaders.DELIVERY_TAG);
Map<String, Object> json = deserilizableForMapFromFile(new String(param), String.class);
log.info("---->>>接收参数:{}", json);
this.validParams(json);
actAttentionService.saveUserToActRelation(json);
channel.basicAck(tag, false);
}catch(Exception e){
log.error("###用户信息与活动信息绑定:" + e.getMessage, e);
channel.basicNack(tag, false, false);
}
}
这里需要介绍一下 basicAck 和 basicNack 这两个方法
其中basicAck中的两个参数
- deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
- multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
现在改为设置成手动ack应答(channel.basicAck方法),这样做的目的是保证消息在正确消费后给回馈,说明我正确消费了。这时队列就可以把这条消息删除了,如果消费端接收了消息,但是没有给返回ack应答,那么这条消息会继续存在unacked状态下,占据队列的空间,等到空间满了,就会出现接下来的消息不能被消费的情况。
那么现在问题又来了,正确的消息被ack 了,那么在消费过程中有异常了怎么办,这条消费肯定就不能返回ack应答了,这时就需要channel.basicNack方法了,这个方法解决了消费异常情况下该条消息怎么处理,有两种办法:第一,这条消息重新放回队列重新消费,第二,抛弃此条消息。那么具体使用哪个方法,这种情况下,建议捕捉异常类型,判断是哪种异常,再做具体处理。
使用channel.basicNack是否将此消息返回队列就由basicNack()方法的第三个参数requeue来决定了,这个地方当时笔者也是踩了一个坑,该参数要设置为false。如果该参数设置为true,那么消费失败的消息返回队列中会一直造成循环消费,一直失败一直消费,造成一个死循环。如果怕偶然性得话,该参数设置为true,则一定要配置消费次数的上限控制,若超出指定次数则直接抛弃掉。
经过手动确认方法处理后,发现队列中的消息被消费了。
|