IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> ZooKeeper客户端源码(二)——向服务端发起请求(顺序响应+同步阻塞+异步回调) -> 正文阅读

[网络协议]ZooKeeper客户端源码(二)——向服务端发起请求(顺序响应+同步阻塞+异步回调)

首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇源码基于ZooKeeper3.7.0版本。

向服务端发起请求

一、向服务端发起请求

客户端与服务端通信的最小单元是Packet。所有请求在发送给服务端之前,都需要先构建一个Packet,再将Packet提交给请求处理队列outgoingQueue并唤醒SendThread线程,最后处理写事件,从outgoingQueue中取出Packet,将其序列化写入网络发送缓冲区。

1、构建协议包

Packet中包含请求头、请求体、响应头、响应体、本地回调函数、watcher注册等信息。

(1)请求体和响应体

不同的请求API有不同的请求体和响应体,比如getData的请求体是GetDataRequest,响应体是GetDataResponsesetData的请求体是SetDataRequest,响应体是SetDataResponse

如下是不同请求体和响应体的类关系图:

Record-Request.drawio

Record-Response.drawio

如下图是常见的几个请求体和响应体的内容结构:

请求体和响应体内容

(2)请求头

请求头RequestHeader定义了操作类型OpCode和请求序号xid

  • 最常见OpCodecreate=1delete=2exists=3getData=5setData=6ping=11等,详细参考org.apache.zookeeper.ZooDefs.OpCode
  • xid用于记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序。正常从1开始自增,但是也有几个特殊的xid定义,NOTIFICATION_XID=-1 watcher通知信息,PING_XID=-2心跳请求,AUTHPACKET_XID=-4授权数据包请求,SET_WATCHES_XID=-8设置watcher请求。

根据协议规定,除非是“会话创建”请求,其他所有的客户端请求都会带上请求头。

(3)getData源码示例

getData源码为例,其他类似:

ZooKeeper.getData

2、发送数据包

(1)提交给outgoingQueue

构建好Packet,就提交给outgoingQueue队列,然后通知SendThread线程:

ClientCnxn.queuePacket

(2)SendThread处理写事件

SendThread线程轮询SelectionKey列表,处理写事件:

ClientCnxnSocketNIO.doIO

除了会话建立请求、心跳请求,其他正常请求发送完毕后,都需要添加到pendingQueue队列,其目的是按顺序处理响应。

(3)网络包序列化

真正要发给服务端的只有请求头和请求体以及长度等少量信息。

请求协议组成

如下是Packet序列化过程:

ClientCnxn.Packet.createBB

发送的网络包,需要序列化为byte数组,而ZooKeeper并没有使用多么高深的序列化技术,实则还是用的Java原生的序列化和反序列化技术ByteArrayOutputStream+DataOutputStream

二、接收服务端响应

1、按顺序处理响应

正常请求,如getDatasetDatacreatedelete等的响应都需要按顺序处理。接收服务端发来的响应信息按顺序和pendingQueue队列中的Packet对比xid是否相等,相等就是同一个请求,不相等就说明顺序乱了,抛出异常。

如下是处理响应的部分源码:

// org.apache.zookeeper.ClientCnxn.SendThread#readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();
    // 解码
    replyHdr.deserialize(bbia, "header");
    // 暂时省略 Xid事件的处理
    
    // 必须按顺序处理响应,pendingQueue 按顺序出队列
    Packet packet;
    synchronized (pendingQueue) {
        if (pendingQueue.size() == 0) {
            throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
        }
        // 从 pendingQueue 中取出 packet
        packet = pendingQueue.remove();
    }
    /*
     * Since requests are processed in order, we better get a response to the first request!
     */
    try {
        // 对比xid是否一致,若不一致则抛出Xid out of order异常
        if (packet.requestHeader.getXid() != replyHdr.getXid()) {
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
            throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
                                  + " with err " + replyHdr.getErr()
                                  + " expected Xid " + packet.requestHeader.getXid()
                                  + " for a packet with details: " + packet);
        }
        // 填充 replyHeader
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());
        if (replyHdr.getZxid() > 0) {
            lastZxid = replyHdr.getZxid();
        }
        if (packet.response != null && replyHdr.getErr() == 0) {
            // 反序列化 response
            packet.response.deserialize(bbia, "response");
        }

        LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
    } finally {
        // 进行packet处理的收尾工作,如注册watcher、唤醒同步阻塞的主线程、触发本地回调函数等
        finishPacket(packet);
    }
}

从网络底层读取数据,然后反序列化出响应头和响应体。

响应协议

2、唤醒同步阻塞

请求的同步阻塞方式到底如何实现的呢?

以同步阻塞方式等待响应结果的请求API,都是调用方法org.apache.zookeeper.ClientCnxn#submitRequest

submitRequest

packet提交给outgoingQueue队列后,就调用packet.wait()阻塞当前线程。接收到响应,解析对比完packet后,调用finishPacket()方法进行收尾工作,如果没有设置Callback,就调用packet.notifyAll()唤醒刚才阻塞的线程。

finishPacket

3、异步回调通知

以异步回调通知响应结果,就直接调用的org.apache.zookeeper.ClientCnxn#queuePacket,直接将packet添加到outgoingQueue队列。

在调用finishPacket()方法进行收尾工作时,判断如果设置了Callback,就将packet交给EventThread进行回调通知。

首先将packet添加到EventThread线程的waitingEvents队列,然后EventThread线程循环遍历waitingEvents队列取出packet处理:

EventThread.queuePacket

EventThread.run

processEvent部分源码

三、总结与参考

一图以蔽之。

请求发起和响应过程.drawio
推荐阅读:《从Paxos到Zookeeper:分布式一致性原理与实践》倪超著。

如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 23:04:40  更:2022-03-15 23:06:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 19:18:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码