IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RocketMQ 消息路由解析——图解、源码级解析 -> 正文阅读

[Java知识库]RocketMQ 消息路由解析——图解、源码级解析

作者:recommend-item-box type_blog clearfix

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2022年6月4日

🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ?留言 📝 都是我最大的动力!


消息路由

在RocketMQ的系统架构里,由于服务器端(Broker)会根据实时压力实施弹性扩缩容等发生变动,客户端为了做负载均衡,就需要有注册中心来提供Broker的信息:
在这里插入图片描述
注册中心的作用是及时发现Broker服务器的变化,并将存活的Broker信息返回给客户端做负载均衡。

获取Topic

获取路由信息函数

// DefaultMQProducerImpl#tryToFindTopicPublishInfo
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());

发送消息前,必须先从注册中心里获取Broker服务器信息,包括Topic、队列、IP,然后采取负载均衡算法发送消息。

常见的负载均衡算法:

  1. 轮询法:将请求按照顺序轮流地分配到各个服务器上。
  2. 加权轮询法:在轮询算法的基础上添加了权重的条件
  3. 随机法
  4. 加权随机法
  5. 最小连接法:哪个服务器的连接数少,就分配给哪个服务器新的请求
  6. 哈希法:计算哈希值,映射到服务器上

tryToFindTopicPublishInfo

/**
 * 根据topic获取路由信息
 * 
 * @param topic
 * @return
 */
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // 1 先从本地 topicPublishInfoTable 中获取路由信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    // 2 路由信息或 messageQueueList 为空
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        // 2.1 添加空路由对象
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 2.2 更新路由信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        // 2.3 从更新后的路由表中获取路由信息
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    // 2.4 获取到了就返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
        return topicPublishInfo;
    } else {
        // 3 没有获取到路由信息则从注册中心获取
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

从上面的源码可以看出获取路由信息的步骤如下:

  1. 先从本地topicPublishInfoTable中获取路由信息
  2. 如果路由信息或messageQueueList为空,则尝试本地更新一下路由信息
  3. 本地更新PublishInfo路由信息,并尝试获取
  4. 如果此时能获取到路由信息了,则返回TopicPublishInfo对象
  5. 本地无法获取到路由信息,则从注册中心尝试获取并更新本地缓存

Topic 路由信息表

上述过程的第一步就是获取路由信息

TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);

其中路由信息存储在TopicPublishInfo对象里:
在这里插入图片描述
各个字段含义如下:

  • orderTopicTopic是否支持排序
  • haveTopicRouterInfo:是否存在路由信息
  • messageQueueList:消息队列List
  • sendWhichQueue:生产者发送消息到哪个队列的索引
  • topicRouteData:路由数据,包括队列、Broker地址、Broker数据

此外,TopicPublishInfo类还提供了选择某个队列发送消息的默认负载均衡策略:

/**
  * 默认【轮询】策略选择一个MessageQueue
  *
  * @param lastBrokerName
  * @return
  */
 public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
     if (lastBrokerName == null) {
         return selectOneMessageQueue();
     } else {
         int index = this.sendWhichQueue.getAndIncrement();
         for (int i = 0; i < this.messageQueueList.size(); i++) {
             int pos = Math.abs(index++) % this.messageQueueList.size();
             if (pos < 0)
                 pos = 0;
             MessageQueue mq = this.messageQueueList.get(pos);
             if (!mq.getBrokerName().equals(lastBrokerName)) {
                 return mq;
             }
         }
         return selectOneMessageQueue();
     }
 }

 /**
  * 选择一个消息队列
  *
  * @return
  */
 public MessageQueue selectOneMessageQueue() {
     int index = this.sendWhichQueue.getAndIncrement();
     int pos = Math.abs(index) % this.messageQueueList.size();
     if (pos < 0)
         pos = 0;
     return this.messageQueueList.get(pos);
 }

从上面代码可以看出,默认的选择策略是采用轮询的方法:

  • lastBrokerName == null时,说明在此之前还没有进行过选择,直接返回第一个可用的消息队列
  • lastBrokerName != null时,且当前轮询到的消息队列不是上一次使用的,则返回当前队列,否则轮询至下一个



更新路由信息

两个子方法

根据tryToFindTopicPublishInfo的源码,接下来会进行更新路由信息的步骤,访问的主要是MQClientInstance类下的updateTopicRouteInfoFromNameServer方法,该方法又调用了两个关键的方法,分别是topicRouteData2TopicPublishInfotopicRouteData2TopicSubscribeInfo


1. topicRouteData2TopicPublishInfo方法的作用是将TopicRouteData类转换成TopicPublishInfo,并过滤掉Master挂了的Slave的MessageQueue

public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
        TopicPublishInfo info = new TopicPublishInfo();
        info.setTopicRouteData(route);
        // 如果指定了Topic的Queue的发送顺序
        if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
            // 解析配置文件,创建消息队列
            String[] brokers = route.getOrderTopicConf().split(";");
            for (String broker : brokers) {
                String[] item = broker.split(":");
                int nums = Integer.parseInt(item[1]);
                for (int i = 0; i < nums; i++) {
                    MessageQueue mq = new MessageQueue(topic, item[0], i);
                    info.getMessageQueueList().add(mq);
                }
            }
            // 设置Topic是有序的(消息的发送顺序按配置来)
            info.setOrderTopic(true);
        } else {
            List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);
            // 找到每个QueueData的BrokerData
            for (QueueData qd : qds) {
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }

                    if (null == brokerData) {
                        continue;
                    }
                    // 如果BrokerData中没有Master节点id,可能Master挂了,此时不处理消息
                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }
                    // 创建消息队列
                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }
            }
            // 设置Topic消息发送是无序的
            info.setOrderTopic(false);
        }
        return info;
    }

  1. topicRouteData2TopicSubscribeInfo方法作用是提取TopicRouteData内的QueueData字段,生成消息队列,也就是订阅了该Topic的队列
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
        Set<MessageQueue> mqList = new HashSet<MessageQueue>();
        List<QueueData> qds = route.getQueueDatas();
        for (QueueData qd : qds) {
            // QueueData是否可读,只有是可读的才能被订阅
            if (PermName.isReadable(qd.getPerm())) {
                for (int i = 0; i < qd.getReadQueueNums(); i++) {
                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                    mqList.add(mq);
                }
            }
        }
        return mqList;
    }

介绍完了updateTopicRouteInfoFromNameServer方法里调用的两个子方法之后,下面就来看一下updateTopicRouteInfoFromNameServer的代码。


updateTopicRouteInfoFromNameServer

更新路由信息是消息投递过程中非常重要的一环,为了防止并发修改注册信息导致数据不一致,这里使用了ReentrantLock可重入锁。

对于路由消息,就需要注意它可能不存在这种情况

1. 路由消息不存在

第一次访问时,生产者还没有在Broker中创建Topic和消息队列时会发生,此时的解决方案是:如果满足isDefault && defaultMQProducer != null,则使用默认Topic来获取路由消息TopicRouteData

在这里插入图片描述
在这里插入图片描述
由上面两张图可以清楚看到,默认Topic名称为TBW102


但如果默认主题获取到的TopicRouteData实例为空呢?此时就要根据Topic名称从注册中心查询了,如果还查询不出来的话就会返回false


2. 路由消息不存在,但是从注册中心获取到了

此时就需要判断本地的路由表和注册中心获取到的路由信息是否有差异,如果差异存在话就把本地路由信息更新为最新版本


上面所有文字部分对应的源码如下:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 使用默认的TopicKey尝试获取TopicRouteData
                        // 当Broker开启自动创建Topic时,会自动进行创建
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                                1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                    if (topicRouteData != null) {
                        // 判断本地路由表存放的信息和远端注册中心存放的信息
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            // 克隆的原因是topicRouteData要被设置到下面的publishInfo和subscribeInfo里
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            // 更新Broker相关信息,当某个Broker心跳超时后,会被从BrokerData的BrokerAddrs中移除
                            // brokerAddrTable也存有Slave的BrokerAddr
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Map.Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Map.Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

                            // Update sub info
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Map.Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Map.Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-08 18:52:54  更:2022-06-08 18:56:18 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 18:42:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码