目录
文章目录
2021SC@SDUSC
一、Zab协议是什么?
二、Follower和Leader状态同步过程
三、源码解析
1.Leader.leader()
2.LearnerHandler.run()
3、Follower.followLeader()
2021SC@SDUSC
一、Zab协议是什么?
ZAB协议包括两种基本模式:崩溃恢复和消息广播。
当整个服务框架在启动过程中,或是Leader服务器出现崩溃、网络中断、重启或者集群中不存在过半的服务器与Leader服务器保持正常通信等异常情况时,ZAB就会进入崩溃恢复模式并选举产生新的Leader服务器。
当选举产生了新的Leader服务器,同时集群中已经有过半的机器与该Leader服务器完成了状态同步(数据同步)之后,ZAB协议就会退出恢复模式。
之后就进入了消息广播模式。如果此时一个同样遵循ZAB协议的加入到集群时,那么如果此时已经存在一个Leader在负责消息广播,那么该服务器就会自觉进入数据恢复模式,在完成之后加入到消息广播流程中。
ZooKeeper设计成只允许唯一的一个Leader服务器来进行事物请求的处理。Leader服务器在接收到客户端的事物请求之后,会生成对应提案并发起一轮广播;如果是其他非Leader机器接收到客户端的事物请求,那么它会将这个事物请求转发给Leader服务器。
二、Follower和Leader状态同步过程
? ? ? ? 当选举结束后,每个节点都需要根据自己的角色更新自己的状态,选举出的Leader更新自己状态为Leader,其他节点更新自己状态为Follower。
Leader更新状态入口:leader.lead()? ? ? ?
Follower更新状态入口:follower.followerLeader()
(1)follower 必须要让leader知道自己的状态:epoch、zxid、sid,必须要找出谁是leader,发送请求连接leader,并发送自己的信息给leader;leader接收到信息后,必须要返回对应的信息给follower。
(2)当leader知道follower状态之后,就确定需要做何种方式的数据同步DIFF,TRUNC,SNAP。
(3)执行数据同步。
(4)当leader接收到超过半数follower的ack之后,进入正常工作状态,集群启动完成。
三、源码解析
1.Leader.leader()
[1]、启动等待来自新跟随者的连接请求的线程。
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
[2]、执行LearnerCnxAcceptor.run()
//等待接收follower的状态同步申请 s = ss.accept();
//启动线程 LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
[3]、执行LearnerHandler.run()
2.LearnerHandler.run()
[1]、从网络中接收Follower的信息
ia.readRecord(qp, "packet");
[2]、Leader根据从Follower获取sid和旧的epoch,构建新的epoch
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
[3]、Leader向Follower发送信息(包含zxid和newEpoch)
oa.writeRecord(newEpochPacket, "packet");
[4]、接收到Follower应答的ackepoch
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
[5]、判断Leader和Follower是否要同步
boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
[6]、接收Follower的ack
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
[7]、uptodate
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
3、Follower.followLeader()
[1]、查找leader
QuorumServer leaderServer = findLeader();
[2]、连接leader
connectToLeader(leaderServer.addr, leaderServer.hostname);
[3]、向leader注册
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
[4]、读取leader返回的结果:leaderinfo
readPacket(qp);
[5]、发送ackepoch给leader(包含自己的epoch和zxid)
writePacket(ackNewEpoch,true)
[6]、发送ack给leader
case Leader.COMMIT:
fzk.commit(qp.getZxid());
Request request = fzk.pendingTxns.element();
commitProcessor.commit(request)
[7]、uptodate
processPacket()
|