由于zk客户端流程对于zk单机服务或者集群基本一致,所以先从zk客户端源码开始学习。
1、客户端启动
首先我们启动一个单机的server,然后给ZookeeperMain打上断点进入调试
//成员变量
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMain.class);
//当前支持解析的command命令如ls等
static final Map<String, String> commandMap = new HashMap<String, String>();
static final Map<String, CliCommand> commandMapCli = new HashMap<String, CliCommand>();
protected MyCommandOptions cl = new MyCommandOptions();
//当前会话的历史命令---history命令就是从这个里面取
protected HashMap<Integer, String> history = new HashMap<Integer, String>();
protected int commandCount = 0;
protected boolean printWatches = true;
protected int exitCode = ExitCode.EXECUTION_FINISHED.getValue();
protected ZooKeeper zk;
protected String host = "";
private CountDownLatch connectLatch = null;
main方法很简单首先会通过zookeeperMain对象与server建立连接,然后进入循环,解析命令行指令并发送请求
public static void main(String[] args) throws IOException, InterruptedException {
//与zkserver建立连接
ZooKeeperMain main = new ZooKeeperMain(args);
//启动命令行读取loop
main.run();
}
run方法就是从标准输入读取输入,解析成具体命令,通过向zkserver建立的连接发送对应的请求。
这里我们来看与zkserver建立连接中发生了什么
2、建立连接
zookeeperMain对象构造过程中会调用connectToZK函数,该函数会new ZooKeeperAdmin()对象,该对象是一个zookeeper抽象类的子类,zookeeper对象是对客户端的封装,包括连接的维护以及各种api接口,ZooKeeperAdmin对象创建过程中会构造一个ClientCnxn对象,ClientCnxn对象管理io-socket与服务端的交互,连接建立的最后会开启两个线程,分别运行Cnxn对象的sendthread逻辑与eventThread逻辑。
3、sendthread逻辑
下面我对sendthread的run方法进行了一些简化,可以看到该线程方法主要是建立与zkserver的连接并对连接进行维护,最后进行调用doTransport进行io读写操作。
while (state.isAlive()) {
if 连接未建立{
重新建立连接
重新初始化参数
}else{
if 连接已建立{
确定是否需要发送AuthFailed事件
}
if 读空闲超时{
抛出异常
}
if 写空闲超时{
发送心跳包
}
doTransport读写操作
}
}
4、 io操作
dotransport实际就是使用java的nio进行io读写操作(我看源码中也有netty的实现,但是调试还是原生的nio),这里也将流程进行简化
doTransport(){
select()
for{
//若可读或可写事件触发
doIO(pendingQueue, cnxn);
}
}
doIO(){
if 可读{
readResponse()----对不同事件进行处理,--若异步事件则加入到eventThread的waitqueue
}
if 可写{
findSendablePacket()
//发送包
sock.write
//加入已发送未收到返回消息的队列
pendingQueue.add
}
}
5、EventThread逻辑
eventThread的run方法很简单,这里直接贴出:
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled) {
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
可以看到EventThread对象维护了一个等待事件的阻塞队列,run方法就是循环的take()获得事件[若空事件就阻塞在这里了],然后调用processEvent对不同事件进行处理,processEvent函数里有将近200行的if-else这里就不贴出来了,主要逻辑就是根据不同的事件标志调用对应的回调函数进行处理,值得注意的这里将watch事件与其他异步处理的事件进行了区分。
那么连接到这里就建立完成了,接下来让我们发送一个 ls / 命令来看看一个请求到底是如何发送并接收的。
6、命令流程
第1小节我们说建立连接之后zkcli会循环读取命令行,这里我将这个流程简化一下:
1、zookeeperMain.run(){
//循环读取命令行并处理
while ((line = (String) readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
}
2、executeLine函数会调用processCmd函数再调用processZKCmd函数对命令进行解析
这里以ls命令为例,最终 通过这代码 cliCmd.parse(args).exec(),调用对应的请求方法
3、ls命令调用的具体方法为
LsCommand.exec(){
//调用了zk对象提供的接口
List<String> children = zk.getChildren(path, watch, stat);
}
4、zookeeper.getChildren(){
这里会首先按照通信协议构造请求头与请求体
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}
5、ClientCnxn.submitRequest(){
//构造发送包----构造过程中会将发送包插入到ClientCnxn对象维护的一个outgoingQueue中
Packet packet = queuePacket()
//loop阻塞等待消息返回信息
}
7、流程总结
通过上面的分析,我们可以知道,zkcli在收到一个命令行命令后会将这个命令加入一个待发送队列,并由单独的发送线程周期检查待发送队列是否有新消息,若有新消息则进行发送,并加入到一个待确认队列,而在接收到新消息后,从待确认队列内删除头节点[zk事务消息的发送保证了时序性,当前接到的response必定是当前的请求],生成一个事件加入到事件处理队列中。偷个网图:
?8、待解决
cli采用了两个队列完成了消息的发送,但是我没有发现消息丢失重传的逻辑,理论上一次消息丢失会导致cli一直阻塞,需要重启。不知道是不是由于每次发送消息都需要带上有序的xid所以发送失败就得用户手动重传,望理解了的大手子在评论区解答一下。
最后,本文只讨论了cli的简单请求流程,zookeeper的一个重要机制watcher还未涉及,关于这点将在下一篇文章中补足~
?
?
|