详细介绍了RocketMQ的NameServer启动流程源码解析,包括RocketMQ的RPC通信模型。
0 NameServer概述
我们要先学会了如何使用RocketMQ,并且看了官方文档之后再来看源码,那样就能节约很多的时间。比如RocketMQ的架构设计,以及Apache RocketMQ开发者指南。
下面是RocketMQ的架构设计图:
在官方文档中可以得知,NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
-
Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活; -
路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
从上面可以得知,无论是Producer、Conumser、Broker都会直接和NameServer进行通信,实际上NameServer是非常重要的角色。但是由于大多数程序员都是一个“使用者”的角色,而使用RocketMQ的API的时候一般也不会接触到NameServer,因此对于NameServer的了解较少。更多的NameServer的特性需要查看官方文档。
实际上RocketMQ在部署启动时,会首先启动NameServer,因此本系列的源码分析文章中,将会以NameServer的启动作为入口,一步步的向后分析Broker、Consumer、Producer的核心源码。
1 NamesrvStartup启动入口
NameServer的启动入口就是namesrv模块的NamesrvStartup类的main方法。该方法将会创建并且初始化一个NamesrvController实例。
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
可以看到入口方法还是比较简单的,我们主要看createNamesrvController创建以及start启动NamesrvController的两个方法的源码。
2 createNamesrvController创建NamesrvController
该方法主要是解析命令行,加载NameServer配置和NettyServer各种配置(解析命令行中-c指定的配置文件)并保存起来然后创建一个NamesrvController。NamesrvController相当于NameServer的一个中央控制器类。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
controller.getConfiguration().registerConfig(properties);
return controller;
}
2.1 new NamesrvController创建控制器
createNamesrvController方法中,会根据NamesrvConfig和NettyServerConfig调用NamesrvController的构造器创建一个实例。
这个构造器源码也比较简单,就是初始化一些属性。
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(log, this.namesrvConfig, this.nettyServerConfig);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
这里面还会初始化一个brokerHousekeepingService,他是一个ChannelEventListener的实现,主要用于主要用于监听Broker的Channel通道关闭事件,并在事件触发时调用RouteInfoManager#onChannelDestroy清除路由信息。
public class BrokerHousekeepingService implements ChannelEventListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final NamesrvController namesrvController;
public BrokerHousekeepingService(NamesrvController namesrvController) {
this.namesrvController = namesrvController;
}
@Override
public void onChannelConnect(String remoteAddr, Channel channel) {
}
@Override
public void onChannelClose(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelException(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
@Override
public void onChannelIdle(String remoteAddr, Channel channel) {
this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);
}
}
3 start启动NamesrvController
在创建NamesrvController之后,调用start方法对其进行启动,实际上就是启动NameServer中的NettyServer服务。
该方法主要做了三件事:
-
调用initialize方法初始化NettyServer。创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等操作。 -
对JVM添加关闭钩子方法,在NameServer的JVM关闭之前执行,关闭NameServerController中线程池,NettyServer进行关闭进行一些内存清理、对象销毁等操作。 -
调用start方法启动NettyServer,并进行监听。
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
controller.start();
return controller;
}
3.1 initialize初始化NettyServer
该方法用于初始化NettyServer。将会执行创建netty远程服务,初始化Netty线程池,注册请求处理器,配置定时任务,用于扫描并移除不活跃的Broker等初始化操作。
initialize的大概步骤为:
-
加载KV配置并存储到kvConfigManager内部的configTable属性中。 -
创建NameServer的netty远程服务。remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。 -
创建netty远程通信执行器线程池remotingExecutor,线程数默认8,线程名以RemotingExecutorThread_为前缀,用作默认的请求处理线程池。 -
注册默认请求处理器DefaultRequestProcessor到remotingServer中。 -
启动两个定时任务。其中一个每隔十秒钟检测不活跃的Broker并清理相关路由信息,这是一个核心知识点,另一个任务则是每隔十分钟打印kv配置信息。
public boolean initialize() {
this.kvConfigManager.load();
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
3.1.1 创建NettyRemotingServer
initialize方法的第一步是创建一个NettyRemotingServer,remotingServer是一个基于Netty的用于NameServer与Broker、Consumer、Producer进行网络通信的服务端。
NettyRemotingServer的构造器主要做了以下事:
-
创建serverBootstrap,这是etty服务端启动类,引导启动服务端。 -
创建一个公共线程池publicExecutor,线程数默认4个线程,线程名以NettyServerPublicExecutor_为前缀。用在registerProcessor方法中,在该方法注册Netty事件处理器时如果没指定线程池,则会统一使用publicExecutor来处理具体的业务,用于处理某些特定的请求业务,例如异步发送消息的回调。 -
根据是否使用epoll模型初始化Boss EventLoopGroup和Worker EventLoopGroup这两个事件循环组,线程数分别默认1个和3个线程,线程名分别以NettyEPOLLBoss_和NettyServerEPOLLSelector_为前缀。这两个线程组对于熟悉Netty的同学应该不陌生了,boss用于处理连接事件,worker用于处理读写事件。
- 如果是linux内核,并且指定开启epoll,并且系统支持epoll,才会使用EpollEventLoopGroup类型,否则使用NioEventLoopGroup类型。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
loadSslContext();
}
3.1.2 registerProcessor注册默认请求处理器
该方法将remotingExecutor绑定到DefaultRequestProcessor上,用作默认的请求处理线程池,并且将DefaultRequestProcessor注册到remotingServer中。
当有请求到来时,首先根据请求的业务code,获取对应的RequestProcessor进行处理,如果该Code没有注册的RequestProcessor,则采用DefaultRequestProcessor处理(逻辑位于NettyRemotingAbstract#processRequestCommand方法中)。
private void registerProcessor() {
if (namesrvConfig.isClusterTest()) {
this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),
this.remotingExecutor);
} else {
this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);
}
}
设置为NettyRemotingServer的defaultRequestProcessor属性:
@Override
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);
}
3.1.3 启动定时任务
在创建了remotingServer并且注册了默认请求处理器之后,将会创建两个定时任务:
-
其中一个首次启动延迟5秒执行,此后每隔10秒执行一次扫描无效的Broker,并清除Broker相关路由信息的任务。 -
另一个首次启动延迟1分钟执行,此后每隔10分钟执行一次打印kvConfig配置信息的任务。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
我们主要关注第一个定时任务,它是非常重要的!但是现在我们不会分析具体的源码,后面会专门的分析。
3.2 注册销毁钩子函数
initialize方法执行完毕之后,对JVM添加关闭钩子方法,在NameServer的JVM关闭之前执行,关闭NameServerController中线程池,NettyServer进行关闭进行一些内存清理、对象销毁等操作。
内部主要是调用controller的shutdown方法:
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
3.3 start启动NettyServer
在初始化NettyServer完毕并且注册了钩子函数之后,将会启动NettyServer。
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
内部会调用remotingServer(NettyRemotingServer)的start方法,该方法才是核心方法,将会启动一个Netty服务端。实际上NettyRemotingServer类属于remoting远程通信模块,因此它是NameServer和Broker共用的进入网络通信类。
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog())
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.getServerSocketSndBufSize() > 0) {
log.info("server set SO_SNDBUF to {}", nettyServerConfig.getServerSocketSndBufSize());
childHandler.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize());
}
if (nettyServerConfig.getServerSocketRcvBufSize() > 0) {
log.info("server set SO_RCVBUF to {}", nettyServerConfig.getServerSocketRcvBufSize());
childHandler.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize());
}
if (nettyServerConfig.getWriteBufferLowWaterMark() > 0 && nettyServerConfig.getWriteBufferHighWaterMark() > 0) {
log.info("server set netty WRITE_BUFFER_WATER_MARK to {},{}",
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark());
childHandler.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
nettyServerConfig.getWriteBufferLowWaterMark(), nettyServerConfig.getWriteBufferHighWaterMark()));
}
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
Rocketmq的远程通信是基于Netty的,从上面的start方法中可以明显的看出来典型的netty服务的启动流程,通过serverBootstrap的一系列方法帮助设置启动配置,包括配置parentGroup和childGroup,配置IO模型,配置连接属性,配置服务端口固定为9876,配置用于执行在真正执行业务逻辑之前需要进行的SSL验证、编解码、空闲检查、网络连接管理等操作的defaultEventExecutorGroup,配置真正处理请求的serverHandler等等。
我们主要看上面几个handler。如果Channel是出现了连接/读/写等事件,是会在Pipeline的ChannelHandler之间流转,当然这是Netty架构的特性,我们不做过多讲述,所以说要想看懂这些框架的底层源码,我们最好能够明白Netty的主要源码和流程,因为很多的框架的通信都是基于Netty这个框架的。
上面设置的Handler包括:
-
handshakeHandler:这是用来处理TSL协议握手的Handler,无需过多关注。 -
NettyEncoder和NettyDecoder:这是RocketMQ自定义的请求解码和编码器,处理报文的编解码操作。负责网络传输数据和 RemotingCommand 之间的编解码。 -
IdleStateHandler:这是Netty自带的心跳管理器,主要是用来检测远端是否存活。默认情况下,测试端一定时间内未接受到被测试端消息和一定时间内向被测试端发送消息的超时时间为120秒。心跳检测就是在这里。 -
connectionManageHandler:处理连接事件的handler,负责连接的激活、断开、超时、异常等事件。 -
serverHandler:处理读写事件的handler,简单的说真正处理业务请求的,这是重点,后面会在专门学习通信的时候解析。
最后,通过serverBootstrap#sync方法启动netty服务端的,方法执行完毕后,NameServer启动完毕,此时NameServer就可以对外提供远程通信服务了,然后就可以启动Broker服务了。
start方法的最后,还启动了一个定时任务scanResponseTable。这里主要是用于处理通信时的异常情况。RocketMQ会将请求结果封装为一个ResponseFuture并且存入responseTable中。那么在发送消息时候,如果遇到服务端没有response返回给客户端或者response因网络而丢失等异常情况。此时可能造成responseTable中的ResponseFuture累积,因此该任务会每隔一秒扫描一次responseTable,将超时的ResponseFuture直接移除,并且执行这些超时ResponseFuture的回调。具体的源码和逻辑,我们同样会在RocketMQ的请求和相应部分解析。
4 RocaktMQ的RPC通信模型设计初探
从上面的源码也能大概看出来NettyRemotingServer的线程模型,实际上这就是RocaktMQ的RPC通信模型,RPC设计是RocketMQ源码中的精华,其中有非常好的思想可以让我们学习,如何实现高性能的RPC通信。
因为RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。RocketMQ中RPC通信的通过1+N+M1+M2的Reactor多线程实现:
-
1就是parentGroup,即eventLoopGroupBoss,内部只包含1个线程,线程名以NettyNIOBoss_为前缀。负责监听 TCP网络连接请求事件,并建立好连接。随后将连接交给childGroup。 -
N就是childGroup,即eventLoopGroupSelector,内部默认包含3个线程线程名以NettyServerNIOSelector_为前缀。用于监听IO读写事件,并负责从网络读取数据。 -
M1就是defaultEventExecutorGroup,当线程数默认8个线程,线程名以NettyServerCodecThread_为前缀。主要用于执行在真正执行业务逻辑之前需要进行的SSL验证、编解码、空闲检查、网络连接管理等操作。 -
M2是什么呢?我们上面说执行的业务请求的ChannelHandler是serverHandler,这个serverHander的源码如果进入看就会知道,它实际上也是一个分发请求的handler,也就是说serverHandler最终会将请求根据不同的消息类型code分发到不同的process线程池处理具体的源码我们后面会分析。不同类型的请求可能会使用不同的process线程池,这就是M2。当然我如果某个也i按没有设置线程池,那就会使用默认的process线程池,即前面初始化的defaultRequestProcessor内部的remotingExecutor线程池(默认8个线程)。
可以看到,从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽,这就是RocketMQ的PRC通信设计。
下面是官方的描述:Reactor多线程设计:
上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
5 NameServer启动流程总结
通过对NameServer模块启动源码的整体通读,其实我们可以发现NameServer启动的源码和原理还是比较简单的,就是进行一些初始化的配置的读取,然后最重要的是启动一个基于Netty的服务端,端口为9876。
在启动的时候,还会启动一些定时任务,比如printAllPeriodically每一分钟打印kv信息,比如scanResponseTable每一秒钟清除无效ResponseFuture,最重要的就是scanNotActiveBroker这个定时任务,该定时任务每隔10秒执行一次扫描,检测无效的Broker,并清除Broker相关路由信息的任务,用于实现Broker相关数据的更新。
后面我们会学习Broker的启动流程源码,它的源码将会更加复杂!NameServer的启动源码仅仅是开胃菜而已!
如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!
|