IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RocketMQ源码解析-Broker与Namesrv以及Consumer交互 -> 正文阅读

[Java知识库]RocketMQ源码解析-Broker与Namesrv以及Consumer交互

? 这一篇我们主要来分析下Broker里面的部分逻辑–Broker主要负责消息的存储、投递和查询以及服务高可用保证。下面我们就来大体梳理下其关于消息的投递、存储、拉取相关的一些逻辑。

? 我们知道broker实例时需要注册到namesrv中,然后其他的生产者、消费者再从namesrv中获取到对应的Broker实例信息,例如broker保存了哪些topictopic对应的队列信息、此borker对应的ip信息等。

一、Broker的启动初始化

? 下面我们来看下broker启动后的初始化加载:

在这里插入图片描述

? 在这里初始化的时候,主要有两个信息的初始化加载,也就是topic信息、队列消费的offset信息,主要是加载持久化的文件信息。

1、topic信息文件

1)、topic的加载

this.topicConfigManager.load();

? 这里主要就是加载store\config下面的topics.json文件信息

在这里插入图片描述

public boolean load() {
    String fileName = null;
    try {
        fileName = this.configFilePath();
        String jsonString = MixAll.file2String(fileName);

        if (null == jsonString || jsonString.length() == 0) {
            return this.loadBak();
        } else {
            this.decode(jsonString);
            log.info("load " + fileName + " OK");
            return true;
        }
    } catch (Exception e) {
        log.error("load " + fileName + " failed, and try to load backup file", e);
        return this.loadBak();
    }
}

? 如果找不到,就会再去加载部分文件。

public void decode(String jsonString) {
    if (jsonString != null) {
        TopicConfigSerializeWrapper topicConfigSerializeWrapper =
            TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
        if (topicConfigSerializeWrapper != null) {
            this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
            this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
            this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
        }
    }
}

? 加载后就会将json转换为TopicConfigSerializeWrapper,再其放到topicConfigTable中。

public class TopicConfigSerializeWrapper extends RemotingSerializable {
    private ConcurrentMap<String, TopicConfig> topicConfigTable =
        new ConcurrentHashMap<String, TopicConfig>();
    private DataVersion dataVersion = new DataVersion();
{
	"dataVersion":{
		"counter":5,
		"timestamp":1645867253838
	},
	"topicConfigTable":{
		"cluster_queue_topic2":{
			"order":false,
			"perm":6,
			"readQueueNums":4,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"cluster_queue_topic2",
			"topicSysFlag":0,
			"writeQueueNums":4
		}
	}
}

? 这上面就是topics.json文件中的信息(已删除其他的topic信息),可以看到其保存的当前topic的权限、读取的队列数量等。

2)、topic的添加

? 上面是加载文件,当我们知道这个文件在创建topic或者删除的时候是需要更新的,例如创建topic的时候。

public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
    TopicConfig topicConfig = null;
    boolean createNew = false;
    try {
        if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
					........
                if (topicConfig != null) {
                    log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
                        defaultTopic, topicConfig, remoteAddress);
                    this.topicConfigTable.put(topic, topicConfig);
                    this.dataVersion.nextVersion();
                    createNew = true;
                    this.persist();
                }
            } finally {
                this.lockTopicConfigTable.unlock();
            }
        }
    } catch (InterruptedException e) {
        log.error("createTopicInSendMessageMethod exception", e);
    }

    if (createNew) {
        this.brokerController.registerBrokerAll(false, true,true);
    }

    return topicConfig;
}

? 例如新建topic后会将其添加到topicConfigTable中,然后再将其this.persist()添加到文件中,这个同样是ConfigManager的公共方法。同时如果这个topic是新建的,会再通过registerBrokerAll方法将其再更新到namesrv中,这个方法也是我们等下要梳理的。

2、消费的队列信息文件

消费队列是加载的store\config下面的consumerOffset.json文件信息

在这里插入图片描述

加载逻辑与后面的topic加载类似,都是同一个父类ConfigManager

{
	"offsetTable":{
		"%RETRY%cluster_consumer_queue_group@cluster_consumer_queue_group":{0:0
		},
		"cluster_queue_topic2@cluster_consumer_queue_group":{0:102,1:102,2:85,3:84
		},
		"%RETRY%simple_batch_consumer_group@simple_batch_consumer_group":{0:0
		}
	}
}

? 这个文件主要是记录topic对应的queue队列的offset消费位置。例如例如上面的

cluster_queue_topic2@cluster_consumer_queue_group":{0:102,1:102,2:85,3:84}

就是记录了消费组cluster_consumer_queue_group对应的topic-cluster_queue_topic2下面的4个队列消费的offset位置。

二、Broker的注册

1、broker的逻辑

? broker启动后,会将其的信息注册到namesrv中。其是在broker的启动方法的调用类BrokerController

public void start() throws Exception {
    if (this.messageStore != null) {
        this.messageStore.start();
    }

    if (this.remotingServer != null) {
        this.remotingServer.start();
    }
	.......
    if (this.brokerOuterAPI != null) {
        this.brokerOuterAPI.start();
    }
		.......
    this.registerBrokerAll(true, false, true);
		........
}
public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,
    final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,final boolean oneway,final int timeoutMills,
    final boolean compressed) {
    final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);

        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) {
            brokerOuterExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);54
                        .........
                }
            });
        }
		........
    return registerBrokerResultList;
}

? 我们可以看到这里会将当前brokeripbrokerNameclusterNameTopicConfigSerializeWrapper等信息告诉namesrv。同时我们可以看到for (final String namesrvAddr : nameServerAddressList),如果多个namesrv的话,其都会进行注册。

2、namesrv逻辑

上面是broker注册的逻辑,下面我们就来看下namesrv的接收逻辑逻辑,其的处理逻辑主要是在DefaultRequestProcessor类做分发:

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
		.......
    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            } else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
       		......
        default:
            break;
    }
    return null;
}

? 这里当前是走的registerBrokerWithFilterServer方法:

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
  		.........
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        registerBrokerBody.getTopicConfigSerializeWrapper(),
        registerBrokerBody.getFilterServerList(),
        ctx.channel());

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

? 可以看到这里主要是将请求的信息交给RouteInfoManager来处理,RouteInfoManager主要是将这些信息保存到其的对应成员变量中,这些内容可以看前面写的一篇文章:RocketMQ源码解析-NameServer篇

三、consumer与broker的逻辑

? 我们知道broker是消息实际保存落地的地方,所以,consumerproducer都是从namesrv获取到broker的对应信息,然后直接再去broker推送、或者获取消息内容。

1、消费者demo

public static void main(String[] args) throws MQClientException {

    DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("cluster_consumer_queue_group");
    defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
    defaultMQPushConsumer.setInstanceName("11");
    String simpleTopic = "cluster_queue_topic2";
    defaultMQPushConsumer.subscribe(simpleTopic,"*");
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.println("SimpleConsumer consumer Msg - " + JSONObject.toJSONString(messageExt));
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    defaultMQPushConsumer.start();
}

2、消费者TopicRouteData获取

? 这里就是一个消费者基本运行。在其启动后会进行很多的初始化内容,例如从namesrv中获取对应的topic相关的队列信息,通过getTopicRouteInfoFromNameServer方法:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                   ...........
                } else {
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
                if (topicRouteData != null) {
                    TopicRouteData old = this.topicRouteTable.get(topic);
              		...........
                    if (changed) {
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                        for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }
                        .....
                    		..........
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    }
                } 
       		.......
    return false;
}

这里主要就是从namesrv中获取TopicRouteData信息,我们知道broker会将其的topic信息注册、更新到namesrv中。

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

其主要是topic的队列信息,以及这些在哪些broker上面(如果是多个broker集群,会有brokerName,及对应的ip信息),

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

在这里插入图片描述

3、offset获取

? 当获取到对应的broker对应的topicqueue队列相关的信息后,也需要获取queue对应的offset信息。

可以看到我们在demo中设置的是CONSUME_FROM_FIRST_OFFSET

@Override
public long computePullFromWhere(MessageQueue mq) {
    long result = -1;
    final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
    final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    switch (consumeFromWhere) {
        	..........
        case CONSUME_FROM_FIRST_OFFSET: {
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                result = 0L;
            } else {
                result = -1;
            }
            break;
        }
      	........
    return result;
}
case READ_FROM_STORE: {
    try {
        long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
        AtomicLong offset = new AtomicLong(brokerOffset);
        this.updateOffset(mq, offset.get(), false);
        return brokerOffset;
    }
    // No offset in broker
    catch (MQBrokerException e) {
        return -1;
    }
    //Other exceptions
    catch (Exception e) {
        log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
        return -2;
    }
}

? 这里就是去这个MessageQueue对应的broker获取其的消费offset

private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
    InterruptedException, MQClientException {
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {

        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());

        return this.mQClientFactory.getMQClientAPIImpl().queryConsumerOffset(
            findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

获取到offset或其再通过this.updateOffset(mq, offset.get(), false),更新到offsetTable中:

private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
    new ConcurrentHashMap<MessageQueue, AtomicLong>();
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
    if (mq != null) {
        AtomicLong offsetOld = this.offsetTable.get(mq);
        if (null == offsetOld) {
            offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
        }

        if (null != offsetOld) {
            if (increaseOnly) {
                MixAll.compareAndIncreaseOnly(offsetOld, offset);
            } else {
                offsetOld.set(offset);
            }
        }
    }
}

在这里插入图片描述

获取到MessageQueue后其再构建PullRequest集合,然后再去Broker拉取消息进行消费:

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
    final boolean isOrder) {
    boolean changed = false;
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();
			.............
    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
			.......
            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = this.computePullFromWhere(mq);
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }
    this.dispatchPullRequest(pullRequestList);
    return changed;
}

在这里插入图片描述

public class PullMessageService extends ServiceThread {
    private final InternalLogger log = ClientLogger.getLog();
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
 		.........
    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

这样再通过run()遍历pullRequestQueue,就会去拉取这些PullRequest对应的broker拉取消息消费了。

public class PullMessageService extends ServiceThread {
    private final InternalLogger log = ClientLogger.getLog();
    private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
 		..........
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

4、broker的推送逻辑

? broker的接收的处理主要是PullMessageProcessor类:

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
    throws RemotingCommandException {
    RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
   		............
    final GetMessageResult getMessageResult =
        this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
            requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);
    if (getMessageResult != null) {
            response.setRemark(getMessageResult.getStatus().name());
            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
            responseHeader.setMinOffset(getMessageResult.getMinOffset());
            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());
 		..........
    boolean storeOffsetEnable = brokerAllowSuspend;
    storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
    storeOffsetEnable = storeOffsetEnable
        && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
    if (storeOffsetEnable) {
        this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    }
    return response;
}

这里主要是获取Message,通过this.brokerController.getConsumerOffsetManager().commitOffset记录本次消费的offset

获取消息逻辑主要是通过DefaultMessageStore来处理,其也是通过offset计算Message所在的CommitLog文件再从中获取:

public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
    final int maxMsgNums,
    	......
    GetMessageResult getResult = new GetMessageResult();
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
    if (consumeQueue != null) {
        minOffset = consumeQueue.getMinOffsetInQueue();
        maxOffset = consumeQueue.getMaxOffsetInQueue();
        ........
		else {
            SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
            if (bufferConsumeQueue != null) {
                try {
                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
							............
                        SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
						................
                        this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                        getResult.addMessage(selectResult);
                        status = GetMessageStatus.FOUND;
                        nextPhyFileStartOffset = Long.MIN_VALUE;
                    }
					........
                    nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
					.........
            } else {
                status = GetMessageStatus.OFFSET_FOUND_NULL;
                nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                    + maxOffset + ", but access logic queue failed.");
            }
        }
    } else {
        status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
        nextBeginOffset = nextOffsetCorrection(offset, 0);
    }
		..........
    getResult.setStatus(status);
    getResult.setNextBeginOffset(nextBeginOffset);
    getResult.setMaxOffset(maxOffset);
    getResult.setMinOffset(minOffset);
    return getResult;
}

? 这里主要是从commitLog文件中获取,这个在前面RocketMQ源码解析-Store篇有介绍。以上就是关于BrokerConsumernamesrv的一般流程交互的逻辑梳理。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-02-28 15:14:49  更:2022-02-28 15:16:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 11:40:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码