? 这一篇我们主要来分析下Broker里面的部分逻辑–Broker主要负责消息的存储、投递和查询以及服务高可用保证。下面我们就来大体梳理下其关于消息的投递、存储、拉取相关的一些逻辑。
? 我们知道broker实例时需要注册到namesrv 中,然后其他的生产者、消费者再从namesrv 中获取到对应的Broker 实例信息,例如broker保存了哪些topic 、topic 对应的队列信息、此borker 对应的ip信息等。
一、Broker的启动初始化
? 下面我们来看下broker 启动后的初始化加载:
? 在这里初始化的时候,主要有两个信息的初始化加载,也就是topic 信息、队列消费的offset 信息,主要是加载持久化的文件信息。
1、topic信息文件
1)、topic的加载
this.topicConfigManager.load();
? 这里主要就是加载store\config 下面的topics.json 文件信息
public boolean load() {
String fileName = null;
try {
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (null == jsonString || jsonString.length() == 0) {
return this.loadBak();
} else {
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
}
} catch (Exception e) {
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
}
}
? 如果找不到,就会再去加载部分文件。
public void decode(String jsonString) {
if (jsonString != null) {
TopicConfigSerializeWrapper topicConfigSerializeWrapper =
TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
if (topicConfigSerializeWrapper != null) {
this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
}
}
}
? 加载后就会将json转换为TopicConfigSerializeWrapper ,再其放到topicConfigTable 中。
public class TopicConfigSerializeWrapper extends RemotingSerializable {
private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>();
private DataVersion dataVersion = new DataVersion();
{
"dataVersion":{
"counter":5,
"timestamp":1645867253838
},
"topicConfigTable":{
"cluster_queue_topic2":{
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"cluster_queue_topic2",
"topicSysFlag":0,
"writeQueueNums":4
}
}
}
? 这上面就是topics.json 文件中的信息(已删除其他的topic 信息),可以看到其保存的当前topic 的权限、读取的队列数量等。
2)、topic的添加
? 上面是加载文件,当我们知道这个文件在创建topic 或者删除的时候是需要更新的,例如创建topic的时候。
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
........
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
this.persist();
}
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageMethod exception", e);
}
if (createNew) {
this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
}
? 例如新建topic 后会将其添加到topicConfigTable 中,然后再将其this.persist() 添加到文件中,这个同样是ConfigManager 的公共方法。同时如果这个topic 是新建的,会再通过registerBrokerAll 方法将其再更新到namesrv 中,这个方法也是我们等下要梳理的。
2、消费的队列信息文件
消费队列是加载的store\config 下面的consumerOffset.json 文件信息
加载逻辑与后面的topic 加载类似,都是同一个父类ConfigManager 。
{
"offsetTable":{
"%RETRY%cluster_consumer_queue_group@cluster_consumer_queue_group":{0:0
},
"cluster_queue_topic2@cluster_consumer_queue_group":{0:102,1:102,2:85,3:84
},
"%RETRY%simple_batch_consumer_group@simple_batch_consumer_group":{0:0
}
}
}
? 这个文件主要是记录topic 对应的queue 队列的offset 消费位置。例如例如上面的
cluster_queue_topic2@cluster_consumer_queue_group":{0:102,1:102,2:85,3:84}
就是记录了消费组cluster_consumer_queue_group 对应的topic -cluster_queue_topic2 下面的4个队列消费的offset 位置。
二、Broker的注册
1、broker的逻辑
? broker 启动后,会将其的信息注册到namesrv 中。其是在broker 的启动方法的调用类BrokerController 中
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
this.remotingServer.start();
}
.......
if (this.brokerOuterAPI != null) {
this.brokerOuterAPI.start();
}
.......
this.registerBrokerAll(true, false, true);
........
}
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,
final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,final boolean oneway,final int timeoutMills,
final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);54
.........
}
});
}
........
return registerBrokerResultList;
}
? 我们可以看到这里会将当前broker 的ip 、brokerName 、clusterName 、TopicConfigSerializeWrapper 等信息告诉namesrv 。同时我们可以看到for (final String namesrvAddr : nameServerAddressList) ,如果多个namesrv 的话,其都会进行注册。
2、namesrv逻辑
上面是broker 注册的逻辑,下面我们就来看下namesrv 的接收逻辑逻辑,其的处理逻辑主要是在DefaultRequestProcessor 类做分发:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
.......
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
} else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
......
default:
break;
}
return null;
}
? 这里当前是走的registerBrokerWithFilterServer 方法:
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
.........
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
? 可以看到这里主要是将请求的信息交给RouteInfoManager 来处理,RouteInfoManager 主要是将这些信息保存到其的对应成员变量中,这些内容可以看前面写的一篇文章:RocketMQ源码解析-NameServer篇
三、consumer与broker的逻辑
? 我们知道broker 是消息实际保存落地的地方,所以,consumer 与producer 都是从namesrv 获取到broker 的对应信息,然后直接再去broker 推送、或者获取消息内容。
1、消费者demo
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("cluster_consumer_queue_group");
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultMQPushConsumer.setInstanceName("11");
String simpleTopic = "cluster_queue_topic2";
defaultMQPushConsumer.subscribe(simpleTopic,"*");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
System.out.println("SimpleConsumer consumer Msg - " + JSONObject.toJSONString(messageExt));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start();
}
2、消费者TopicRouteData获取
? 这里就是一个消费者基本运行。在其启动后会进行很多的初始化内容,例如从namesrv 中获取对应的topic相关的队列信息,通过getTopicRouteInfoFromNameServer 方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
...........
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
...........
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
.....
..........
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
.......
return false;
}
这里主要就是从namesrv 中获取TopicRouteData 信息,我们知道broker 会将其的topic 信息注册、更新到namesrv 中。
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
private List<QueueData> queueDatas;
private List<BrokerData> brokerDatas;
private HashMap<String, List<String>> filterServerTable;
其主要是topic 的队列信息,以及这些在哪些broker 上面(如果是多个broker 集群,会有brokerName ,及对应的ip 信息),
public class BrokerData implements Comparable<BrokerData> {
private String cluster;
private String brokerName;
private HashMap<Long, String> brokerAddrs;
3、offset获取
? 当获取到对应的broker 对应的topic 、queue 队列相关的信息后,也需要获取queue 对应的offset 信息。
可以看到我们在demo 中设置的是CONSUME_FROM_FIRST_OFFSET
@Override
public long computePullFromWhere(MessageQueue mq) {
long result = -1;
final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
switch (consumeFromWhere) {
..........
case CONSUME_FROM_FIRST_OFFSET: {
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
result = lastOffset;
} else if (-1 == lastOffset) {
result = 0L;
} else {
result = -1;
}
break;
}
........
return result;
}
case READ_FROM_STORE: {
try {
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
}
catch (MQBrokerException e) {
return -1;
}
catch (Exception e) {
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
}
}
? 这里就是去这个MessageQueue 对应的broker 获取其的消费offset 。
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
} else {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
}
获取到offset 或其再通过this.updateOffset(mq, offset.get(), false) ,更新到offsetTable 中:
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
获取到MessageQueue 后其再构建PullRequest 集合,然后再去Broker 拉取消息进行消费:
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) {
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
.............
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
.......
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
this.dispatchPullRequest(pullRequestList);
return changed;
}
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
.........
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
这样再通过run() 遍历pullRequestQueue ,就会去拉取这些PullRequest 对应的broker 拉取消息消费了。
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
..........
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
4、broker的推送逻辑
? broker 的接收的处理主要是PullMessageProcessor 类:
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
............
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
responseHeader.setMinOffset(getMessageResult.getMinOffset());
responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
..........
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}
return response;
}
这里主要是获取Message ,通过this.brokerController.getConsumerOffsetManager().commitOffset 记录本次消费的offset 。
获取消息逻辑主要是通过DefaultMessageStore 来处理,其也是通过offset 计算Message 所在的CommitLog 文件再从中获取:
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
......
GetMessageResult getResult = new GetMessageResult();
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
minOffset = consumeQueue.getMinOffsetInQueue();
maxOffset = consumeQueue.getMaxOffsetInQueue();
........
else {
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
............
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
................
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
........
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
.........
} else {
status = GetMessageStatus.OFFSET_FOUND_NULL;
nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
+ maxOffset + ", but access logic queue failed.");
}
}
} else {
status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
nextBeginOffset = nextOffsetCorrection(offset, 0);
}
..........
getResult.setStatus(status);
getResult.setNextBeginOffset(nextBeginOffset);
getResult.setMaxOffset(maxOffset);
getResult.setMinOffset(minOffset);
return getResult;
}
? 这里主要是从commitLog 文件中获取,这个在前面RocketMQ源码解析-Store篇有介绍。以上就是关于Broker 与Consumer 、namesrv 的一般流程交互的逻辑梳理。
|