IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> RocketMQ源码分析-Rpc通信模块(remoting)二 -> 正文阅读

[网络协议]RocketMQ源码分析-Rpc通信模块(remoting)二

前言

今天继续RocketMQ-Rpc通信模块(remoting)的源码分析。上一章提到了主要的start()方法执行流程,如果有不清楚的地方可以一起讨论哈,这篇文章会继续解读主要方法,按照惯例先看看NettyRemotingAbstract的类图,看类图知方法。和NettyEventExecutor以及MQ的交互流程。 按照惯例先看看NettyRemotingAbstract的类图,看类图知方法,文中会挑重要方法和主要流程解读。

?

再看看其核心属性:

//oneway方式发送的限流控制
    protected final Semaphore semaphoreOneway;
    //异步发送的限流控制
    protected final Semaphore semaphoreAsync;
//缓存是已经发送,但是还未收到回应的map
    protected final ConcurrentMapresponseTable =new ConcurrentHashMap(256);
//事件code对应的处理器
    protected final HashMap<integer *="" request="" code="" ,="" pair> processorTable =
            new HashMap<integer, pair>(64);
    //Netty事件处理如心跳,连接,关闭等
    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();
//默认处理器,上篇已经介绍使用方式
    protected PairdefaultRequestProcessor; //消息处理器
    //ssl相关
    protected volatile SslContext sslContext;
复制代码

具体的交互流程:在RocketMQ消息队列中支持通信的方式主要同步接口(invokeSync),异步接口(invokeAsync) 直接发送(invokeOneway),以异步发送为例流程图如下所示:

?

整个RPC交互流程成分为上边几个步骤,以异步调用为例分析下整个流程接下来一起看看主要的方法,

@1客户端调用NettyRemotingClient.invokeAsync

public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
        throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException
//发送的开始时间
long beginStartTime = System.currentTimeMillis();
//@2根据地址获取Channel对象
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
    try {
        if (this.rpcHook != null) {
            //存在rpcHook的话执行Hook函数
            this.rpcHook.doBeforeRequest(addr, request);
        }
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            throw new RemotingTooMuchRequestException("invokeAsync call timeout");
        }
        //@3 发送消息
        this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
    } catch (RemotingSendRequestException e) {
        log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
        this.closeChannel(addr, channel);
        throw e;
    }
} else {
    this.closeChannel(addr, channel);
    throw new RemotingConnectException(addr);
}
复制代码

@2:根据addr获取到Netty的通道channel,如果通道不存在,就创建一个新的

@3:调用NettyRemotingAbstract.invokeAsyncImpl()方法发送异步消息

//异步发送消息
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                            final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
    //startTime    
    long beginStartTime = System.currentTimeMillis();
    //获取请求Id
    final int opaque = request.getOpaque();
    //@4 异步发送用信息量作为并发控制
    boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
    if (acquired) {
        final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
        long costTime = System.currentTimeMillis() - beginStartTime;
        if (timeoutMillis < costTime) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl call timeout");
        }
        //@5 生成返回值并将回调函数设置到responseFuture存入responseTable中
        final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
        this.responseTable.put(opaque, responseFuture);
        try {
            //@6 发送消息
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    }
                    requestFail(opaque);
                    log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                }
            });
        } catch (Exception e) {
            responseFuture.release();
            log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
            throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
        }
    } else {
        if (timeoutMillis <= 0) {
            throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
        } else {
            String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                            timeoutMillis,
                            this.semaphoreAsync.getQueueLength(),
                            this.semaphoreAsync.availablePermits()
                    );
            log.warn(info);
            throw new RemotingTimeoutException(info);
        }
    }
}
复制代码

@4:申请信号量,异步发送是通过信号量来控制流量,主要靠ResponseFuture类

@5:构建ResponseFuture对象并放入responseTable中,responseTable保存了ResponseFuture返回响应对象,在收到服务端的响应时通过从responseTable获取到响应对象来控制锁的释放以及回调函数的触发

@6:channel.writeAndFlush方法发送到Server端服务端接收Client端的消NettyServerHandler.channelRead0()

@7:NettyRemotingAbstract.processMessageReceived处理接收到的消息,可以看到方法内部是区分request和response的,根据流程来,进入@8

@8:处理消息请求,这个方法在上篇文章那解析过,主要内容就是根据RemotingCommand的请求业务码来匹配到相应的业务处理器,然后生成一个新的线程提交至对应的业务线程池进行异步处理,并在处理完成后发送响应给客户端,进入@9

NettyClientHandler.channelRead0和server端处理方式一样调用processMessageReceived处理进入@10rocessResponseCommand@10:首先获取到opaque,从responseTable获取到对应的ResponseFuture,异步请求的话触发回调函数,同步的话就释放锁。

小结

Rpc通信是MQ的基石,RocketMQ的通信模型是一套很标准的实时通讯模型,涉及到到网络部分很值得去学习,笔者水平有限,理解不到位的地方希望能多多指出,感谢!继续努力。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-10-08 21:14:47  更:2022-10-08 21:15:50 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/5 9:37:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码