IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> rocketmq核心源码分析第二十篇一消息消费五步曲一消费失败并发重试与顺序阻塞 -> 正文阅读

[Java知识库]rocketmq核心源码分析第二十篇一消息消费五步曲一消费失败并发重试与顺序阻塞

并发消费一失败重试

不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】

  • 计算ackIndex
  • 根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup
  • 消费失败需要重发消息
  • 如果发送到重试队列失败 则需要兜底重新消费[重试次数也增加]
  • 移除ProcessQueue中处理过的消息
public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;
    处理统计信息 以及计算ackIndex
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    核心: 重试逻辑:  如果是消费失败,根据消费模式(集群消费还是广播消费),广播模式,直接丢弃,集群模式发送 sendMessageBack
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            根据ack index 来决定是否发送到重试队列 %RETRY%+consumeGroup
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                集群模式消费失败需要重发消息【先remotingClient发送,失败采用内部defaultProducer发送】
                boolean result = this.sendMessageBack(msg, context);
                如果发送到重试队列失败 则需要兜底重新消费
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }
            进行兜底消费
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                // 这里需要注意 如果sendMessageBack 发送失败 则会尝试[延迟5s]重新消费
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }
    移除ProcessQueue中处理过的消息
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    不管消费成功与否 都会更新消费进度 【对于broker来说 没有失败 消息都会消费成功,其实就是修改消费偏移量,consume端消费失败的会在重试主题创建新的消息】
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

顺序消费一失败阻塞

  • 失败时自旋阻塞消费
  • 失败消费间隔1s
  • 失败消费次数16次
  • 超过16次消费失败,回发重试队列,不在进行消费
  • 重试队列最多16次则按照重试时间间隔进行
  • 重试队列16次均失败则存储死信队列
 public boolean processConsumeResult(
            final List<MessageExt> msgs,
            final ConsumeOrderlyStatus status,
            final ConsumeOrderlyContext context,
            final ConsumeRequest consumeRequest
    ) {
    boolean continueConsume = true;
    long commitOffset = -1L;
    // 默认自动提交
    if (context.isAutoCommit()) {
        switch (status) {
            case COMMIT:
            case ROLLBACK:
            case SUCCESS:
                commitOffset = consumeRequest.getProcessQueue().commit();
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                break;
            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                超过16次则发送到重试队列
                if (checkReconsumeTimes(msgs)) {
                    将consumingMsgOrderlyTreeMap正在消费的消息重新添加到msgTreeMap
                    consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                    一秒后重新消费
                    this.submitConsumeRequestLater(
                            consumeRequest.getProcessQueue(),
                            consumeRequest.getMessageQueue(),
                            context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                } else {
                    commitOffset = consumeRequest.getProcessQueue().commit();
                }
                break;
            default:
                break;
        }
        ......删除非自动提交代码
    根据消费进度
    if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
    }

    return continueConsume;
}

总结

  • 消费失败均会回传重试队列
  • 回传是构建新的消息
  • 顺序消费回发重试队列前 会自旋阻塞消费
  • 消费失败处理完毕,需更新offsetStore消费进度管理
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-02-05 21:35:51  更:2022-02-05 21:36:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:51:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码