🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年6月4日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ?留言 📝 都是我最大的动力!
消息路由
在RocketMQ的系统架构里,由于服务器端(Broker )会根据实时压力实施弹性扩缩容等发生变动,客户端为了做负载均衡,就需要有注册中心来提供Broker 的信息: 注册中心的作用是及时发现Broker服务器的变化,并将存活的Broker 信息返回给客户端做负载均衡。
获取Topic
获取路由信息函数
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
发送消息前,必须先从注册中心里获取Broker 服务器信息,包括Topic 、队列、IP,然后采取负载均衡算法发送消息。
常见的负载均衡算法:
- 轮询法:将请求按照顺序轮流地分配到各个服务器上。
- 加权轮询法:在轮询算法的基础上添加了权重的条件
- 随机法
- 加权随机法
- 最小连接法:哪个服务器的连接数少,就分配给哪个服务器新的请求
- 哈希法:计算哈希值,映射到服务器上
tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || (topicPublishInfo != null && topicPublishInfo.ok())) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
从上面的源码可以看出获取路由信息的步骤如下:
- 先从本地
topicPublishInfoTable 中获取路由信息 - 如果路由信息或
messageQueueList 为空,则尝试本地更新一下路由信息 - 本地更新
PublishInfo 路由信息,并尝试获取 - 如果此时能获取到路由信息了,则返回
TopicPublishInfo 对象 - 本地无法获取到路由信息,则从注册中心尝试获取并更新本地缓存
Topic 路由信息表
上述过程的第一步就是获取路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
其中路由信息存储在TopicPublishInfo 对象里: 各个字段含义如下:
orderTopic :Topic 是否支持排序haveTopicRouterInfo :是否存在路由信息messageQueueList :消息队列ListsendWhichQueue :生产者发送消息到哪个队列的索引topicRouteData :路由数据,包括队列、Broker地址、Broker数据
此外,TopicPublishInfo 类还提供了选择某个队列发送消息的默认负载均衡策略:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
从上面代码可以看出,默认的选择策略是采用轮询的方法:
lastBrokerName == null 时,说明在此之前还没有进行过选择,直接返回第一个可用的消息队列lastBrokerName != null 时,且当前轮询到的消息队列不是上一次使用的,则返回当前队列,否则轮询至下一个
更新路由信息
两个子方法
根据tryToFindTopicPublishInfo 的源码,接下来会进行更新路由信息的步骤,访问的主要是MQClientInstance 类下的updateTopicRouteInfoFromNameServer 方法,该方法又调用了两个关键的方法,分别是topicRouteData2TopicPublishInfo 和topicRouteData2TopicSubscribeInfo
1. topicRouteData2TopicPublishInfo 方法的作用是将TopicRouteData 类转换成TopicPublishInfo ,并过滤掉Master挂了的Slave的MessageQueue
public static TopicPublishInfo topicRouteData2TopicPublishInfo(final String topic, final TopicRouteData route) {
TopicPublishInfo info = new TopicPublishInfo();
info.setTopicRouteData(route);
if (route.getOrderTopicConf() != null && route.getOrderTopicConf().length() > 0) {
String[] brokers = route.getOrderTopicConf().split(";");
for (String broker : brokers) {
String[] item = broker.split(":");
int nums = Integer.parseInt(item[1]);
for (int i = 0; i < nums; i++) {
MessageQueue mq = new MessageQueue(topic, item[0], i);
info.getMessageQueueList().add(mq);
}
}
info.setOrderTopic(true);
} else {
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
if (PermName.isWriteable(qd.getPerm())) {
BrokerData brokerData = null;
for (BrokerData bd : route.getBrokerDatas()) {
if (bd.getBrokerName().equals(qd.getBrokerName())) {
brokerData = bd;
break;
}
}
if (null == brokerData) {
continue;
}
if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
continue;
}
for (int i = 0; i < qd.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
info.getMessageQueueList().add(mq);
}
}
}
info.setOrderTopic(false);
}
return info;
}
topicRouteData2TopicSubscribeInfo 方法作用是提取TopicRouteData 内的QueueData 字段,生成消息队列,也就是订阅了该Topic的队列
public static Set<MessageQueue> topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {
Set<MessageQueue> mqList = new HashSet<MessageQueue>();
List<QueueData> qds = route.getQueueDatas();
for (QueueData qd : qds) {
if (PermName.isReadable(qd.getPerm())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
mqList.add(mq);
}
}
}
return mqList;
}
介绍完了updateTopicRouteInfoFromNameServer 方法里调用的两个子方法之后,下面就来看一下updateTopicRouteInfoFromNameServer 的代码。
updateTopicRouteInfoFromNameServer
更新路由信息是消息投递过程中非常重要的一环,为了防止并发修改注册信息导致数据不一致,这里使用了ReentrantLock 可重入锁。
对于路由消息,就需要注意它可能不存在这种情况
1. 路由消息不存在
第一次访问时,生产者还没有在Broker中创建Topic和消息队列时会发生,此时的解决方案是:如果满足isDefault && defaultMQProducer != null ,则使用默认Topic来获取路由消息TopicRouteData
由上面两张图可以清楚看到,默认Topic名称为TBW102
但如果默认主题获取到的TopicRouteData 实例为空呢?此时就要根据Topic名称从注册中心查询了,如果还查询不出来的话就会返回false
2. 路由消息不存在,但是从注册中心获取到了
此时就需要判断本地的路由表和注册中心获取到的路由信息是否有差异,如果差异存在话就把本地路由信息更新为最新版本
上面所有文字部分对应的源码如下:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Map.Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
{
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Map.Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put TopicRouteData[{}]", cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LockTimeoutMillis);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
|