并发消费一失败重试
不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】
- 计算ackIndex
- 根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup
- 消费失败需要重发消息
- 如果发送到重试队列失败 则需要兜底重新消费[重试次数也增加]
- 移除ProcessQueue中处理过的消息
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
处理统计信息 以及计算ackIndex
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
核心: 重试逻辑: 如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
集群模式消费失败需要重发消息【先remotingClient发送,失败采用内部defaultProducer发送】
boolean result = this.sendMessageBack(msg, context);
如果发送到重试队列失败 则需要兜底重新消费
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
进行兜底消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
移除ProcessQueue中处理过的消息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
顺序消费一失败阻塞
- 失败时自旋阻塞消费
- 失败消费间隔1s
- 失败消费次数16次
- 超过16次消费失败,回发重试队列,不在进行消费
- 重试队列最多16次则按照重试时间间隔进行
- 重试队列16次均失败则存储死信队列
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
超过16次则发送到重试队列
if (checkReconsumeTimes(msgs)) {
将consumingMsgOrderlyTreeMap正在消费的消息重新添加到msgTreeMap
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
一秒后重新消费
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
......删除非自动提交代码
根据消费进度
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
总结
- 消费失败均会回传重试队列
- 回传是构建新的消息
- 顺序消费回发重试队列前 会自旋阻塞消费
- 消费失败处理完毕,需更新offsetStore消费进度管理
|