IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Consumer上报消费位点分析 -> 正文阅读

[大数据]Consumer上报消费位点分析

1. 前言

在消息中间件中,消费者对于消费成功的消息,一般是需要返回ACK给Broker的,它的目的是让Broker知道消息已经被成功消费,不必再投递给其它消费者重试了。在RocketMQ中,这一过程的具体实现为「上报消费位点」,RocketMQ没有办法针对单个消息返回ACK,Consumer只能上报MessageQueue已经消费的消息偏移量。
?

Consumer实例启动时,同一Group下所有的实例都会进行「重平衡」操作,给自身重新分配MessageQueue,默认的分配策略就是「平均分配」,这意味着,同一个MessageQueue只会被一个Consumer实例消费,即同一个MessageQueue只会有一个Consumer上报消费位点,不存在冲突问题。但是,Consumer实例是多线程并发消费消息的,请大家思考这样一个问题:**线程A和线程B并发消费同一个队列,拉取到的消息为M1、M2,消息对应的Offset为1、2,线程B成功消费了M2,但是线程A还未消费M1,此时Consumer该如何上报消费位点?**直接上报2的话,M1还没消费,一旦消费失败,M1存在丢失的可能,RocketMQ是如何做的呢?
?

2. OffsetStore

阅读RocketMQ的源码就会发现,OffsetStore是RocketMQ用来管理Consumer消费位点的接口,通过接口定义我们就能知道它具备哪些功能。

public interface OffsetStore {
    /**
     * 加载消费进度
     * 1.集群:不用加载,进度由Broker维护
     * 2.广播:从本地文件加载
     */
    void load() throws MQClientException;

    /**
     * 更新队列消费进度
     * @param mq
     * @param offset
     * @param increaseOnly 更新为offset还是在原基础上递增?
     */
    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

    /**
     * 获取消费进度
     * @param mq
     * @param type 获取类型:内存、磁盘文件
     * @return
     */
    long readOffset(final MessageQueue mq, final ReadOffsetType type);

    /**
     * 持久化所有队列的消费进度
     * 1.集群:上报给Broker
     * 2.广播:写入本地文件
     * 上报消费进度时,先写入内存,定时任务5秒钟执行一次。
     * @param mqs
     */
    void persistAll(final Set<MessageQueue> mqs);

    /**
     * 持久化单个队列的消费进度
     * @param mq
     */
    void persist(final MessageQueue mq);

    /**
     * 删除队列的消费进度
     */
    void removeOffset(MessageQueue mq);

    /**
     * 克隆给定Topic下的队列消费进度情况
     * @param topic
     * @return
     */
    Map<MessageQueue, Long> cloneOffsetTable(String topic);

    /**
     * 上报消费进度给Broker,仅针对集群消费
     * @param mq
     * @param offset
     * @param isOneway
     */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException;
}

核心关注的方法有三个,本文会重点分析:

方法说明
load从磁盘加载消费位点,针对广播模式
updateOffset更新队列消费位点
persistAll持久化消费位点

RocketMQ消息消费模式有两种:集群消费和广播消费,对应的实现类分别是RemoteBrokerOffsetStore和LocalFileOffsetStore。从名字也能看得出来,集群模式下消费进度由Broker管理,广播模式下消费进度由客户端自己管理。

2.1 LocalFileOffsetStore

广播模式下,消息需要被所有的Consumer实例消费,因此每个实例的消费进度是不一样的,所以消费进度会交给Consumer客户端自己管理,消费进度会以Json的方式持久化到磁盘文件,默认的路径是home/.rocketmq_offsets/${ClientId}/${groupName}/offsets.json,Consumer启动时会从本地磁盘文件加载,对应的方法是readLocalOffset()

private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
    String content = null;
    try {
        // 读取持久化文件
        content = MixAll.file2String(this.storePath);
    } catch (IOException e) {
        log.warn("Load local offset store file exception", e);
    }
    if (null == content || content.length() == 0) {
        // 持久化时,会先将旧数据备份到.bak文件
        // 如果正式文件加载失败,会尝试通过备份文件恢复
        return this.readLocalOffsetBak();
    } else {
        OffsetSerializeWrapper offsetSerializeWrapper = null;
        try {
            offsetSerializeWrapper =
                OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
        } catch (Exception e) {
            log.warn("readLocalOffset Exception, and try to correct", e);
            return this.readLocalOffsetBak();
        }

        return offsetSerializeWrapper;
    }
}

从磁盘读取Json文件,然后将队列的消费进度放到Map容器中。

OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null 
    && offsetSerializeWrapper.getOffsetTable() != null) {
    offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
}

当Consumer成功消费消息后,会根据消息的Offset去更新消费位点。广播模式下,只需要修改Map容器里的值即可。

@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
    if (mq != null) {
        AtomicLong offsetOld = this.offsetTable.get(mq);
        if (null == offsetOld) {
            // 不存在的队列,写入Offset
            offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
        }
        if (null != offsetOld) {
            if (increaseOnly) {
                // 在原来的基础上底层
                MixAll.compareAndIncreaseOnly(offsetOld, offset);
            } else {
                offsetOld.set(offset);
            }
        }
    }
}

updateOffset只是修改了内存中的数据,Consumer会启动定时任务,5秒钟进行一次持久化。广播模式下,持久化就是将Map容器中的数据以Json的方式写入磁盘文件,代码就不贴了。

2.2 RemoteBrokerOffsetStore

集群模式下,消息只需要被其中一个Consumer实例消费即可,相同ConsumerGroup消费进度是一致的,所以消费进度会交给Broker管理。
?

由于消费进度是由Broker负责管理的,所有RemoteBrokerOffsetStore实现类的load()方法是空的,什么也不做。
?

当Consumer成功消费消息后,同样会调用updateOffset方法更新队列消费位点,代码同上。此时只是将消费位点写入内存里的Map容器,并不会立即上报给Broker。因为消息的消费是一个很频繁的动作,如果每次消费都上报位点,对Consumer和Broker的压力都是很大的。
?

如果updateOffset后没有调用persistAll方法服务就挂了,就会导致Broker认为那些已经消费的消息还没有被消费,从而造成消息的重复投递,不过没关系,RocketMQ本身是允许消息的重复投递的,它的服务质量是「至少一次,允许多次」。
?

Consumer启动的定时任务会5秒进行一次持久化,对于集群模式而言,所谓的“持久化”就是上报队列的消费位点给Broker。在persistAll方法中,Consumer会遍历offsetTable,按个将MessageQueue的消费位点进行上报。上报位点的方法是updateConsumeOffsetToBroker()。上报消费位点需要和Broker交互,涉及到网络请求,默认使用「单向请求」,不关心上报是否成功,即使不成功也没关心,大不了消息重复消费嘛。

/**
 * 上报消费位点到Broker
 * @param mq 队列
 * @param offset 消费位点
 * @param isOneway 是否单向请求?
 */
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    // 查找Broker地址
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        // 构建请求Header
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        // 发送请求给Broker
        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

3. 上报流程

在这里插入图片描述

PullMessageService拉取到消息后会提交消费请求到ConsumeMessageService,唤醒消息消费线程开始消费消息,再根据消息消费状态去处理消费结果。消费成功就好处理了,直接更新消费位点即可。对于消费失败的情况,在广播模式下,消费失败的消息会记录日志然后直接丢弃;在集群模式下,消费失败的消息会回传给Broker,等待重新投递。

处理消费结果的方法是processConsumeResult,ProcessQueue对象缓存了Consumer拉取到的消息,它底层采用TreeMap存储,Key是消息Offset,Value是消息。TreeMap用的是红黑树结构,它的Key是有序的,从小到大排序。消费成功的消息会从TreeMap中删除,Consumer每次上报的消费位点永远是TreeMap的最小的Key,即最小的消息Offset。这就解答了开篇提的问题,线程B虽然消费了M2,但是M1还没消费,TreeMap中的firstKey仍然是1,这时是不会上报消费位点2的,只有M1也被消费了,才会随着2一起上报。

/**
 * 处理消费结果
 * 1.相关数据统计
 * 2.根据ackIndex将失败消息回传给Broker
 * 3.回传失败,提交异步任务,5s后重试消费
 * 4.上报消费位点
 */
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest) {
    
    // ackIndex前的消息代表成功,之后的代表失败
    int ackIndex = context.getAckIndex();
    if (consumeRequest.getMsgs().isEmpty())
        return;

    // 统计数据
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            // 消费失败,重置ackIndex,整批消息都需要重新处理
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:// 广播模式下,直接丢掉
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:// 集群模式下,回传给Broker
            // 消息回传失败列表
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            // ackIndex之前的消息代表消费成功,之后的消息失败需要发回Broker
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 回传给Broker
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    // 回传失败,暂存到msgBackFailed,稍后重新消费
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                // 回传失败的消息从msgs删除,不会上报失败消息的位点
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                // 提交消费请求,5s后消费重试
                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        // 上报消费位点,不包含回传Broker失败的消息
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

消费失败且回传给Broker失败的消息,不会从ProcessQueue中删除,也就不会上报消费位点。

4. 总结

RocketMQ通过OffsetStore接口来管理消费进度,广播模式下由Consumer客户端自己管理,集群模式下由Broker负责管理。Consumer消费消息后仅仅会更新内存中的消费位点,由定时任务负责5秒进行一次持久化。广播模式的持久化是将数据写入磁盘文件,集群模式的持久化是将消费位点上报给Broker。
?

Consumer底层采用TreeMap来存储拉取到的消息,Key是消息Offset、Value是消息本身,所以它是根据消息的Offset从小到达排列的,上报消费位点时,上报的总是firstKey,即最小Offset。并发消费时,即使Offset较大的消息先被消费了也没关系,Offset并不会被直接上报。这带来了另一个问题,存在漏报的可能,会导致消息重复消费,所以Consumer对于较敏感的数据,需要做好消费的幂等性。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-11 17:35:16  更:2021-10-11 17:37:29 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 8:26:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码