前言
今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。
?
再看看其核心属性:
//oneway方式发送的限流控制
protected final Semaphore semaphoreOneway;
//异步发送的限流控制
protected final Semaphore semaphoreAsync;
//缓存是已经发送,但是还未收到回应的map
protected final ConcurrentMapresponseTable =new ConcurrentHashMap(256);
//事件code对应的处理器
protected final HashMap<integer *="" request="" code="" ,="" pair> processorTable =
new HashMap<integer, pair>(64);
//Netty事件处理如心跳,连接,关闭等
protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
//默认处理器,上篇已经介绍使用方式
protected PairdefaultRequestProcessor; //消息处理器
//ssl相关
protected volatile SslContext sslContext;
复制代码
具体的交互流程:在RocketMQ消息队列中支持通信的方式主要同步接口(invokeSync),异步接口(invokeAsync) 直接发送(invokeOneway),以异步发送为例流程图如下所示:
?
整个RPC交互流程成分为上边几个步骤,以异步调用为例分析下整个流程接下来一起看看主要的方法,
@1客户端调用NettyRemotingClient.invokeAsync
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException
//发送的开始时间
long beginStartTime = System.currentTimeMillis();
//@2根据地址获取Channel对象
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
if (this.rpcHook != null) {
//存在rpcHook的话执行Hook函数
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout");
}
//@3 发送消息
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
} catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
复制代码
@2:根据addr获取到Netty的通道channel,如果通道不存在,就创建一个新的
@3:调用NettyRemotingAbstract.invokeAsyncImpl()方法发送异步消息
//异步发送消息
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
final InvokeCallback invokeCallback)
throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//startTime
long beginStartTime = System.currentTimeMillis();
//获取请求Id
final int opaque = request.getOpaque();
//@4 异步发送用信息量作为并发控制
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
}
//@5 生成返回值并将回调函数设置到responseFuture存入responseTable中
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
//@6 发送消息
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
}
requestFail(opaque);
log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
} catch (Exception e) {
responseFuture.release();
log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
}
} else {
if (timeoutMillis <= 0) {
throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
} else {
String info =
String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
timeoutMillis,
this.semaphoreAsync.getQueueLength(),
this.semaphoreAsync.availablePermits()
);
log.warn(info);
throw new RemotingTimeoutException(info);
}
}
}
复制代码
@4:申请信号量,异步发送是通过信号量来控制流量,主要靠ResponseFuture类
@5:构建ResponseFuture对象并放入responseTable中,responseTable保存了ResponseFuture返回响应对象,在收到服务端的响应时通过从responseTable获取到响应对象来控制锁的释放以及回调函数的触发
@6:channel.writeAndFlush方法发送到Server端服务端接收Client端的消NettyServerHandler.channelRead0()
@7:NettyRemotingAbstract.processMessageReceived处理接收到的消息,可以看到方法内部是区分request和response的,根据流程来,进入@8
@8:处理消息请求,这个方法在上篇文章那解析过,主要内容就是根据RemotingCommand的请求业务码来匹配到相应的业务处理器,然后生成一个新的线程提交至对应的业务线程池进行异步处理,并在处理完成后发送响应给客户端,进入@9
NettyClientHandler.channelRead0和server端处理方式一样调用processMessageReceived处理进入@10rocessResponseCommand@10:首先获取到opaque,从responseTable获取到对应的ResponseFuture,异步请求的话触发回调函数,同步的话就释放锁。
小结
Rpc通信是MQ的基石,RocketMQ的通信模型是一套很标准的实时通讯模型,涉及到到网络部分很值得去学习,笔者水平有限,理解不到位的地方希望能多多指出,感谢!继续努力。
|