前言:
前一篇文章中介绍了Zookeeper.java客户端创建的基本过程。有很多不太明确的知识点,主要是因为没有与实际场景结合起来。所以本文中,通过实际请求示例的分析来了解下其具体操作过程。
1.create()创建节点信息
通过一个示例,来展示下客户端如何发送创建节点信息
public class ZkClient {
private String connectString = "127.0.0.1:2181";
private int sessionTimeout = 3000;
ZooKeeper zkCli = null;
// 初始化客户端
@Before
public void init() throws IOException {
zkCli = new ZooKeeper(connectString, sessionTimeout, null);
}
// 创建子节点
@Test
public void createZnode() throws KeeperException, InterruptedException {
String path = zkCli.create("/hello", "world".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(path);
}
}
示例很简单,连接服务端地址为本地启动的server,创建节点直接调用zookeeper.create()方法即可,参数即路径、value信息以及节点mode即可。
2.Zookeeper.create() 方法分析? ??
public class ZooKeeper {
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
// 如果有chrootpath的话,需要拼接上
final String serverPath = prependChroot(clientPath);
// 请求头 具体内容见2.1
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
// 请求体 具体内容见2.2
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
// 将基本信息封装到请求体中
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
// 统一交由ClientCnxn发送,具体分析见3
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
}
2.1 RequestHeader请求头
public class RequestHeader implements Record {
// 唯一的id号
private int xid;
// 代表当前请求类型,创建、获取节点内容等不同类型,具体在ZooDefs.OpCode中
private int type;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
xid=a_.readInt("xid");
type=a_.readInt("type");
a_.endRecord(tag);
}
}
2.2 CreateRequest 创建节点请求体
public class CreateRequest implements Record {
// 路径信息
private String path;
// 节点值
private byte[] data;
// 权限控制信息,非重点,直接忽略
private java.util.List<org.apache.zookeeper.data.ACL> acl;
// 节点类型,具体见CreateMode
private int flags;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");
a_.writeBuffer(data,"data");
{
a_.startVector(acl,"acl");
if (acl!= null) { int len1 = acl.size();
for(int vidx1 = 0; vidx1<len1; vidx1++) {
org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
a_.writeRecord(e1,"e1");
}
}
a_.endVector(acl,"acl");
}
a_.writeInt(flags,"flags");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
data=a_.readBuffer("data");
{
Index vidx1 = a_.startVector("acl");
if (vidx1!= null) { acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>();
for (; !vidx1.done(); vidx1.incr()) {
org.apache.zookeeper.data.ACL e1;
e1= new org.apache.zookeeper.data.ACL();
a_.readRecord(e1,"e1");
acl.add(e1);
}
}
a_.endVector("acl");
}
flags=a_.readInt("flags");
a_.endRecord(tag);
}
}
请求头是所有请求类型所共用的,不同的请求类型会有不同的请求体,本例中创建节点使用的就是CreateRequest,删除节点就是DeleteRequest...
3.ClientCnxn.submitRequest()发送请求
public class ClientCnxn {
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
// 直接交由queuePacket()处理
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration)
{
Packet packet = null;
synchronized (outgoingQueue) {
// 将请求体请求头都封装到packet中,packet具体内容见3.1
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
// 将packet添加到outgoingQueue中
outgoingQueue.add(packet);
}
}
// 唤醒ClientCnxnSocket
sendThread.getClientCnxnSocket().wakeupCnxn();
return packet;
}
}
submitRequest()方法最主要就是将请求信息封装为Packet,并添加到outgoingQueue中,后续SendThread.run()方法不断轮询outgoingQueue中的Packet,并真正发送出去。所以后续处理还是要去看SendThread.run()
3.1 Packet封装
static class Packet {
RequestHeader requestHeader;
ReplyHeader replyHeader;
Record request;
Record response;
ByteBuffer bb;
/** Client's view of the path (may differ due to chroot) **/
String clientPath;
/** Servers's view of the path (may differ due to chroot) **/
String serverPath;
boolean finished;
AsyncCallback cb;
Object ctx;
WatchRegistration watchRegistration;
public boolean readOnly;
/** Convenience ctor */
Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration) {
this(requestHeader, replyHeader, request, response,
watchRegistration, false);
}
Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
Record request, Record response,
WatchRegistration watchRegistration, boolean readOnly) {
// 封装这些基本信息
this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
this.response = response;
this.readOnly = readOnly;
this.watchRegistration = watchRegistration;
}
// 主要使用到的方法就是这个,将请求参数序列化的方法
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // 后续通过this.bb.putInt(this.bb.capacity() - 4);来填补len
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
} catch (IOException e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
}
在这里需要说明下节点创建请求的序列化结构,通过Packet.createBB()序列化方法的展示,我们可以看到整个请求协议体格式如下:
?而每种不同的协议都有不同的请求体,有关于本例中的CreateRequest,请求体格式如下:
?
4.SendThread.run() 执行任务发送
class SendThread extends ZooKeeperThread {
// 表示对服务端的一个长连接
private final ClientCnxnSocket clientCnxnSocket;
public void run() {
...
while (state.isAlive()) {
try {
...
// 发送请求包到服务端的任务直接交由 clientCnxnSocket来处理
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
...
}
...
}
}
4.1 clientCnxnSocket.doTransport
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
// 触发连接事件
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
sendThread.primeConnection();
}
// 触发读写事件
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 则将需要读取或者需要写出事件,交由doIO来执行,具体见4.2
doIO(pendingQueue, outgoingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
synchronized(outgoingQueue) {
// 发现outgoingQueue有可发送的数据,则注册OP_WRITE事件
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
enableWrite();
}
}
}
selected.clear();
}
}
4.2 ClientCnxnSocketNIO.doIO() 真正发送请求到服务端
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 可读,说明接收到服务端的相应数据,接收响应在5.1中分析
if (sockKey.isReadable()) {
...
}
// 有数据可写,说明outgoingQueue被添加了数据
if (sockKey.isWritable()) {
synchronized(outgoingQueue) {
Packet p = findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress());
if (p != null) {
updateLastSend();
// 默认bb为null,
if (p.bb == null) {
if ((p.requestHeader != null) &&
(p.requestHeader.getType() != OpCode.ping) &&
(p.requestHeader.getType() != OpCode.auth)) {
// 这个时候才将xid赋值,该值为int类型,唯一递增
p.requestHeader.setXid(cnxn.getXid());
}
// 创建需要发送到服务端的ByteBuffer,具体创建过程可见3.1
p.createBB();
}
// 最终在这里通过SocketChannel将ByteBuffer发送出去
sock.write(p.bb);
// 发送完成之后,将packet添加到pendingQueue,等待响应
if (!p.bb.hasRemaining()) {
sentCount++;
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
}
}
代码不算复杂,通过轮询Selector的写事件,将添加到outgoingQueue的请求,通过SocketChannel发送出去,发送出去的数据需要通过packet.createBB()来确定最终ByteBuffer。
这里需要注意的是,在发送成功之后,会将已发送的Package信息添加到pendingQueue中,等待响应。
下面我们继续分析接收响应的过程
5.接收服务端响应结果
每一种类型的请求都会接收到服务端的响应,我们来看下节点创建的请求接收响应的过程。
5.1 ClientCnxnSocketNIO.doIO() 处理可读事件
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
protected ByteBuffer incomingBuffer = lenBuffer;
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
// 可读,说明接收到服务端的相应数据
if (sockKey.isReadable()) {
// 数据读入到incomingBuffer中
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
// incomingBuffer读满的时候,则进行数据处理
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
// 读取到len字段,获取响应的总长度
if (incomingBuffer == lenBuffer) {
recvCount++;
// 分配len长度的ByteBuffer
readLength();
// 如果还未初始化,说明当前响应是创建连接的响应
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue,
cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 读取len长度的数据到ByteBuffer中,最终交由SendThread解析,具体见5.2
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
...
}
}
5.2 SendThread.readResponse() 解析响应结果
class SendThread extends ZooKeeperThread {
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
// ReplyHeader响应头,具体内容见5.2.1
ReplyHeader replyHdr = new ReplyHeader();
// 进行header解析
replyHdr.deserialize(bbia, "header");
// 以下是各种异常的处理方案
if (replyHdr.getXid() == -2) {
...
}
if (replyHdr.getXid() == -4) {
..
}
// 监听触发,暂时忽略。后续专门来讲解
if (replyHdr.getXid() == -1) {
...
}
...
Packet packet;
synchronized (pendingQueue) {
// 每一个响应都会对应一个请求,如果pendingQueue中没有值,说明有异常
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
try {
// 如果两个xid不匹配,说明顺序出了问题
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+ replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet );
}
// 将响应头添加到packet.replyHeader中
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
// 将响应结果反序列化到packet.response中
// 根据不同的响应类进行对应的反序列化,比如GetDataResponse、CreateResponse等
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
}
5.2.1 ReplyHeader 响应头
public class ReplyHeader implements Record {
// 客户端生成的xid,用来将请求和响应对上
private int xid;
// zookeeper服务端当前最新的事务ID
private long zxid;
// 当出现异常时,返回对应的异常码,具体的异常码都在Code中
private int err;
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeInt(xid,"xid");
a_.writeLong(zxid,"zxid");
a_.writeInt(err,"err");
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
xid=a_.readInt("xid");
zxid=a_.readLong("zxid");
err=a_.readInt("err");
a_.endRecord(tag);
}
}
5.3 读取响应结果
最终我们回到开始的地方,zookeeper.create()方法
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath, createMode.isSequential());
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
// 读取到响应头ReplyHeader,若有error,则直接抛错
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
// CreateResponse只有一个参数,就是path,创建成功后,返回对应的全路径信息
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
总结:响应的代码不算复杂,主要就是从Selector中接收读事件,将响应结果交由SendThread来处理,处理完成的响应结果封装到packet.response中。
响应体如下所示:
?
|