一、前言
remoting是RocketMQ的底层通信模块,RocketMQ底层通讯是使用Netty来实现的。本文通过对remoting源码进行分析,来说明remoting如何实现高性能通信的。
二、Remoting 通信模块结构
remoting 的网络通信是基于 Netty 实现,模块中类的继承关系如下:
可见通信的类继承自类RemotingService,RemotingService的定义如下:
public interface RemotingService {
// 服务启动
void start();
// 服务停止
void shutdown();
//注册RPC钩子函数
void registerRPCHook(RPCHook rpcHook);
}
RemotingServer:继承自RemotingService,定义了服务端的接口
public interface RemotingServer extends RemotingService {
/**
* 注册处理器
* @param requestCode 请求码
* @param processor 处理器
* @param executor 线程池
* 这三者是绑定关系:
* 根据请求的code 找到处理对应请求的处理器与线程池 并完成业务处理。
*/
void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
final ExecutorService executor);
/**
* 注册缺省处理器
* @param processor 缺省处理器
* @param executor 线程池
*/
void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);
// 获取服务端口
int localListenPort();
/**
* 根据 请求码 获取 处理器和线程池
* @param requestCode 请求码
* @return
*/
Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);
/**
* 同步调用
* @param channel 通信通道
* @param request 业务请求对象
* @param timeoutMillis 超时时间
* @return 响应结果封装
*/
RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
RemotingTimeoutException;
/**
* 异步调用
* @param channel 通信通道
* @param request 业务请求对象
* @param timeoutMillis 超时时间
* @param invokeCallback 响应结果回调对象
*/
void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException,
RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;
/**
* 单向调用 (不关注返回结果)
* @param channel 通信通道
* @param request 业务请求对象
* @param timeoutMillis 超时时间
*/
void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException;
}
从上面的代码可以看出,RemotingServer的主要功能是注册请求协议处理器、请求调用方法。
NettyRemotingServer::服务端的实现类,实现了 RemotingServer 接口,继承 NettyRemotingAbstract 抽象类
public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
// Netty服务端启动器
private final ServerBootstrap serverBootstrap;
// worker组
private final EventLoopGroup eventLoopGroupSelector;
// boss组
private final EventLoopGroup eventLoopGroupBoss;
// Netty服务端配置信息类
private final NettyServerConfig nettyServerConfig;
// 公共线程池 (在注册协议处理器的时候,若未给处理器指定线程池,那么就是用该公共线程池)
private final ExecutorService publicExecutor;
// Netty Channel 特殊状态监听器
private final ChannelEventListener channelEventListener;
// 定时器 (功能: 扫描 responseTable表,将过期的responseFuture移除)
private final Timer timer = new Timer("ServerHouseKeepingService", true);
// 用于在pipeline指定handler中 执行任务的线程池
private DefaultEventExecutorGroup defaultEventExecutorGroup;
// 服务端绑定的端口
private int port = 0;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
// 用于处理 SSL 握手连接的处理器
private HandshakeHandler handshakeHandler;
// 协议编码 处理器
private NettyEncoder encoder;
// 连接管理 处理器
private NettyConnectManageHandler connectionManageHandler;
// 核心业务 处理器
private NettyServerHandler serverHandler;
// 参数1: nettyServerConfig Netty服务端配置信息
// 参数2: channelEventListener channel特殊状态监听器
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
// 调用父类 就是通过 Semaphore 设置请求并发限制
// 1. 设置 单行请求的并发限制
// 2. 设置 异步请求的并发限制
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
// 创建公共线程池 publicExecutor 线程数量为:4
int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
if (publicThreadNums <= 0) {
publicThreadNums = 4;
}
this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
}
});
// 下面就是根据操作系统平台来选择创建 bossGroup 和 workGroup的逻辑
if (useEpoll()) {
this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
}
});
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
// 加载SSL连接的相关方法 (不在本篇的分析范围内)
loadSslContext();
}
}
NettyRemotingServer当中重要的参数:
- 父类的属性?semaphoreOneway?, **semaphoreAsync ** 用来控制请求并发量的
- serverBootstrap?Netty服务器启动器
- nettyServerConfig?Netty服务器配置信息
- channelEventListener?Netty Channel状态监听器
- eventLoopGroupSelector?worker组
- eventLoopGroupBoss?boss组
NettyRemotingServer的启动
// 启动Netty 服务器
@Override
public void start() {
// Netty pipeline中的指定 handler 采用该线程池执行
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 初始化 处理器 handler
// 1. handshakeHandler SSL连接
// 2. encoder 编码器
// 3. connectionManageHandler 连接管理器处理器
// 4. serverHandler 核心业务处理器
prepareSharableHandlers();
// 下面就是 Netty 创建服务端启动器的固定流程
ServerBootstrap childHandler =
// 配置服务端 启动对象
// 配置工作组 boss 和 worker 组
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
// 设置服务端ServerSocketChannel 类型
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 设置服务端ch选项
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
// 设置客户端ch选项
.childOption(ChannelOption.TCP_NODELAY, true)
// 设置 接收缓冲区 和 发送缓冲区的 大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 设置服务器端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 初始化 客户端ch pipeline 的逻辑, 同时指定了线程池为 defaultEventExecutorGroup
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
// 客户端开启 内存池,使用的内存池 是 PooledByteBufAllocator.DEFAULT
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 服务器 绑定端口
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
// 条件成立: channel状态监听器不为空, 则创建 网络异常事件执行器
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 提交定时任务,每一秒 执行一次
// 扫描 responseTable 表, 将过期的 responseFuture 移除
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
上述代码 基本上就是 模板Netty创建服务端的代码,主要做了如下几件事:
- 启动Netty服务器
- 开启 channel状态监听线程
- 开启 扫描?responseFuture?的定时任务
通过这个结构图可以看出,RocketMQ 在 Netty 原生的多线程 Reactor 模型上做了一系列的扩展和优化,使用多个线程池来处理数据
? ? 1、一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP 网络连接请求,建立好连接,创建 SocketChannel,并注册到 selector 上。 ? ? ?RocketMQ 的源码中会自动根据 OS 的类型选择 NIO 和 Epoll,也可以通过参数配置,然后监听真正的网络数据。 ? ? 2、拿到网络数据后,再丢给 Worker 线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3), ? ? 3、在真正执行业务逻辑之前需要进行 SSL 验证、编解码、空闲检查、网络连接管理,这些工作交给 defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为 8 )去做。 ? ? 4、而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码 code 去 processorTable 这个本地缓存变量中找到对应的 processor,然后封装成 task 任务后,提交给对应的业务 processor 处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。
NettyRemotingAbstract:抽象类NettyRemotingAbstract是NettyRemotingServer的父类,主要定义了请求并发量、控制响应对象和各种请求处理函数。
public abstract class NettyRemotingAbstract {
// 控制 单向请求的 并发量
protected final Semaphore semaphoreOneway;
// 控制 异步请求的 并发量
protected final Semaphore semaphoreAsync;
// 响应对象映射表 (key: opaque value:responseFuture)
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);
// 请求处理器映射表 (key: requestCode value:(processor,executor) )
protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
// Netty事件监听线程池
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
// 默认的请求处理器对 包含(processor,executor)
protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;
// SSL相关
protected volatile SslContext sslContext;
// 扩展钩子
protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
}
|