首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇源码基于ZooKeeper3.7.0 版本。
一、向服务端发起请求
客户端与服务端通信的最小单元是Packet 。所有请求在发送给服务端之前,都需要先构建一个Packet ,再将Packet 提交给请求处理队列outgoingQueue 并唤醒SendThread 线程,最后处理写事件,从outgoingQueue 中取出Packet ,将其序列化写入网络发送缓冲区。
1、构建协议包
Packet 中包含请求头、请求体、响应头、响应体、本地回调函数、watcher 注册等信息。
(1)请求体和响应体
不同的请求API有不同的请求体和响应体,比如getData 的请求体是GetDataRequest ,响应体是GetDataResponse ,setData 的请求体是SetDataRequest ,响应体是SetDataResponse 。
如下是不同请求体和响应体的类关系图:
如下图是常见的几个请求体和响应体的内容结构:
(2)请求头
请求头RequestHeader 定义了操作类型OpCode 和请求序号xid 。
- 最常见
OpCode 有create=1 、delete=2 、exists=3 、getData=5 、setData=6 、ping=11 等,详细参考org.apache.zookeeper.ZooDefs.OpCode 。 xid 用于记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序。正常从1开始自增,但是也有几个特殊的xid 定义,NOTIFICATION_XID=-1 watcher 通知信息,PING_XID=-2 心跳请求,AUTHPACKET_XID=-4 授权数据包请求,SET_WATCHES_XID=-8 设置watcher 请求。
根据协议规定,除非是“会话创建”请求,其他所有的客户端请求都会带上请求头。
(3)getData源码示例
以getData 源码为例,其他类似:
2、发送数据包
(1)提交给outgoingQueue
构建好Packet ,就提交给outgoingQueue 队列,然后通知SendThread 线程:
(2)SendThread处理写事件
SendThread 线程轮询SelectionKey 列表,处理写事件:
除了会话建立请求、心跳请求,其他正常请求发送完毕后,都需要添加到pendingQueue 队列,其目的是按顺序处理响应。
(3)网络包序列化
真正要发给服务端的只有请求头和请求体以及长度等少量信息。
如下是Packet 序列化过程:
发送的网络包,需要序列化为byte 数组,而ZooKeeper 并没有使用多么高深的序列化技术,实则还是用的Java原生的序列化和反序列化技术ByteArrayOutputStream +DataOutputStream 。
二、接收服务端响应
1、按顺序处理响应
正常请求,如getData 、setData 、create 、delete 等的响应都需要按顺序处理。接收服务端发来的响应信息按顺序和pendingQueue 队列中的Packet 对比xid 是否相等,相等就是同一个请求,不相等就说明顺序乱了,抛出异常。
如下是处理响应的部分源码:
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
+ " with err " + replyHdr.getErr()
+ " expected Xid " + packet.requestHeader.getXid()
+ " for a packet with details: " + packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
finishPacket(packet);
}
}
从网络底层读取数据,然后反序列化出响应头和响应体。
2、唤醒同步阻塞
请求的同步阻塞方式到底如何实现的呢?
以同步阻塞方式等待响应结果的请求API,都是调用方法org.apache.zookeeper.ClientCnxn#submitRequest :
将packet 提交给outgoingQueue 队列后,就调用packet.wait() 阻塞当前线程。接收到响应,解析对比完packet 后,调用finishPacket() 方法进行收尾工作,如果没有设置Callback ,就调用packet.notifyAll() 唤醒刚才阻塞的线程。
3、异步回调通知
以异步回调通知响应结果,就直接调用的org.apache.zookeeper.ClientCnxn#queuePacket ,直接将packet 添加到outgoingQueue 队列。
在调用finishPacket() 方法进行收尾工作时,判断如果设置了Callback ,就将packet 交给EventThread 进行回调通知。
首先将packet 添加到EventThread 线程的waitingEvents 队列,然后EventThread 线程循环遍历waitingEvents 队列取出packet 处理:
三、总结与参考
一图以蔽之。
推荐阅读:《从Paxos到Zookeeper:分布式一致性原理与实践》倪超著。
如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。
|