在 RocketMQ 当中,消息发送方以及消息接收方都是配置命名服务(Name Server)的地址。通过命名服务解耦合了消息发送者以及消息接收方,不同于 Kafka 直接连接 Broker 地址。命名服务的主要功能包含:Broker 管理以及消息的路由管理。具有如下:
Broker 管理 ,NameServer接受来自 Broker 集群的注册,并提供心跳机制来检查Broker是否活着路由管理 ,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的 queue (队列) 信息。
源码分析基于 RocketMQ - 4.9.3
1、NameServer 启动整体流程
从之前 3、RocketMQ 源码解析之 源代码环境搭建 这篇文章中我可以看到命名服务的启动类是:NamesrvStartup 。而 命名服务最重要的类其实是 NamesrvController ,它控制着命名服务的整个流程。命名服务启动其实就是调用NamesrvController 的三个方法的过程:
NamesrvController#<init> :通过有参构建器传入 NamesrvConfig 以及 NettyServerConfig 这两个类,创建NamesrvController 对象。NamesrvController#initialize :调用该方法对 NamesrvController 对象进行属性初始化。NamesrvController#start : 调用该方法完成对 NameServer 的启动,启动 NettyRemotingServer 暴露 Netty 服务端的 Socket 服务。
下面我们就来详细的分析一下这三个过程。
2、NamesrvController 对象构建
NamesrvController.init
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
在 NamesrvController 对象构建里面最核心的还是创建 RouteInfoManager 对象,这个对象里面保存了就完成了 Broker 管理 以及 消息的路由管理 。
下面我们来看一下这个对象里面的核心字段:
HashMap<String/* topic */, List<QueueData>> topicQueueTable :消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。HashMap<String/* brokerName */, BrokerData> brokerAddrTable :broker 名称以及 Broker 数据的映射信息。HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable :集群名称以及对应的 Broker 名称列表HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable :broker 地址对应 Broker 通道(Channel) 的对应信息。HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable :broker 地址以及消息的各种过滤机制。
这里包含了 Broker 对应的所有元数据信息。Broker 每过一段时间(默认 30 秒)就会向 NameServer 发送心跳,告诉 NameServer 我当前还是存活的。并且如果心跳超过 2 分钟没有发送就会把这个 Broker 从上面的 brokerLiveTable 列表当中移除。
RouteInfoManager#scanNotActiveBroker
public void scanNotActiveBroker() {
Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, BrokerLiveInfo> next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
上面就是 Broker 存活检测逻辑。
3、NamesrvController 初始化
下面我们来看一下 的初始化逻辑。
NamesrvController#initialize
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
上面的逻辑其实挺简单的,主要的逻辑如下:
kvConfigManager KV 类型的配置管理器从配置文件中加载配置- 创建 Netty 远程服务
NettyRemotingServer 暴露命名服务 Socket 通信 - 创建一个远程线程池
remotingExecutor 用于异步处理客户端的 Socket 连接请求,默认没 8 个工作线程。 registerProcessor() :向 NettyRemotingServer 远程服务端注册请求处理器(DefaultRequestProcessor ,这个处理器我们稍后在分析),并且使用上一步创建的 remotingExecutor 进行异步处理- 使用单个线程池调用
RouteInfoManager#scanNotActiveBroker 扫描并删除 2 分钟没有心跳的 Broker 以及 KVConfigManager#printAllPeriodically 打印kvConfigManager KV 类型的配置管理器里面的配置值。 - 如果服务器开启了 SSL 加密传输就启动
FileWatchService 定时扫描文件更新 SSL
DefaultRequestProcessor 其实就是整个 NameServer 处理 Socket 请求的类.其实最终就是操作 KVConfigManager 这个配置类或者 RouteInfoManager 这个类。
4、NamesrvController 启动
NamesrvController 的启动就相对比较简单了。
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
它的只包括了两个步骤:
NettyRemotingServer#start ,调用 Netty 的服务端 ServerBootstrap 进行服务端启动暴露 Socket 服务,默认是 9876 ;- 启动
FileWatchService 服务,当 SSL 有更新时,就更新 SSL 配置。
|