选举算法源码中的总思路
Zookeeper 的 Leader 选举类是 FastLeaderElection,该类是 ZAB 协议在 Leader 选举中的工程应用,所以直接找到该类对其进行分析。该类中的最为重要的方法为 lookForLeader(),是选举 Leader 的核心方法。该方法大体思路可以划分为以下几块:
1.选举前的准备工作:选举前需要做一些准备工作,例如,创建选举对象、创建选举过程中需要用到的集合、初始化选举时限等。
2.将自己作为初始化 Leader 投出去:在当前 Server 第一次投票时会先将自己作为 Leader,然后将自己的选票广播给其它所有 Server。
3.循环交换投票直至选出Leader
4.验证自己的投票与大家的投票谁更适合做 Leader:在“我选我”后,当前 Server 同样会接收到其它 Server 发送来的选票通知(Notification)。通过 while 循环,遍历所有接收到的选票通知,比较谁更适合做 Leader。若找到一个比自己更适合的 Leader,则修改自己选票,重新将新的选票广播出去。当然,每验证一个选票,则会将其记录到一个集合中,将来用于进行票数统计。 5.判断本轮选举是否应结束:其实在每次验证过谁更适合做 Leader 后,就会马上判断当前的选举是否可以结束了,即当前主机所推荐的这个选票是否过半了。若过半了,则直接完成后续的一些收尾工作,例如清空选举过程中所使用的集合,以备下次使用;再例如,生成最终的选票,以备其它 Server来同步数据。若没有过半,则继续从队列中读取出下一个来自于其它主机的选票,然后进行验证。 6.无需选举的情况:对一些特殊情况的处理。
源码解读
需要注意,对源码的阅读主要包含两方面。一个是对重要类、重要成员变量、重要方法的注释的阅读;一个是对重要方法的业务逻辑的分析。篇幅过长,本篇先解读重要类、成员变量
下面源码部分代码被省略,只列出来了目前需要关注的部分,并且调整了顺序,方便理解:
重要类、成员变量的解读
QuorumPeer
一个QuorumPeer可以理解为一个准备参加选举的ZK的server,即配置文件zoo.cfg中配置的服务器 代表具有选举权的Server,它具有三个状态LOOKING, FOLLOWING, LEADING(不包含OBSERVING状态)
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider {
...
}
FastLeaderElection 选举类
系统初始化时,每一个QuorumPeer对象维护了一个FastLeaderElection对象来为自己的选举工作进行代言。
public class FastLeaderElection implements Election {
private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class);
final static int finalizeWait = 200;
final static int maxNotificationInterval = 60000;
QuorumCnxManager manager;
QuorumPeer self;
AtomicLong logicalclock = new AtomicLong();
long proposedLeader;
long proposedZxid;
long proposedEpoch;
static public class Notification {
...
long leader;
long zxid;
long electionEpoch;
QuorumPeer.ServerState state;
long sid;
long peerEpoch;
...
}
static public class ToSend {
static enum mType {crequest, challenge, notification, ack}
ToSend(mType type,
long leader,
long zxid,
long electionEpoch,
ServerState state,
long sid,
long peerEpoch) {
...
}
...
}
static ByteBuffer buildMsg(int state,
long leader,
long zxid,
long electionEpoch,
long epoch) {
byte requestBytes[] = new byte[40];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
requestBuffer.clear();
requestBuffer.putInt(state);
requestBuffer.putLong(leader);
requestBuffer.putLong(zxid);
requestBuffer.putLong(electionEpoch);
requestBuffer.putLong(epoch);
requestBuffer.putInt(Notification.CURRENTVERSION);
return requestBuffer;
}
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
Messenger messenger;
protected class Messenger {
class WorkerReceiver extends ZooKeeperThread {
...
}
class WorkerSender extends ZooKeeperThread {
...
}
WorkerSender ws;
WorkerReceiver wr;
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
void halt(){
this.ws.stop = true;
this.wr.stop = true;
}
}
...
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
this.stop = false;
this.manager = manager;
starter(self, manager);
}
...
public Vote lookForLeader() throws InterruptedException {
}
...
}
QuorumCnxManager 连接管理器
QuorumCnxManager 这个类我们只需要知道该类大概的作用就可以了。
public class QuorumCnxManager {
...
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
最近发送过的消息。在这个集合中,为每个SID保留最近发送过的一个消息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
public final ArrayBlockingQueue<Message> recvQueue;
...
}
这个类维护了当前Server和其他Server之间的TCP连接,并且保证了服务器之间只有一个连接。 另外该类还维护了一个Map结构的数据queueSendMap,其中key为集群中其他Server的ServerId,value为一个消息队列,保存了当前Server给其他Server发送消息时失败的消息副本。 有一个点需要关注,通过本地维护的消息队列,可以判断当前Server与集群之间的连接是否正常,如下图: 在这里插入图片描述 
QuorumCnxManager还有分别负责消息的发送和接收的SenderWorker和RecvWorker两个继承了Runnable接口的线程内部类,这两个线程类的构造方法如下:
SendWorker(Socket sock, Long sid) {
super("SendWorker:" + sid);
this.sid = sid;
this.sock = sock;
recvWorker = null;
try {
dout = new DataOutputStream(sock.getOutputStream());
} catch (IOException e) {
LOG.error("Unable to access socket output stream", e);
closeSocket(sock);
running = false;
}
LOG.debug("Address of remote peer: " + this.sid);
}
RecvWorker(Socket sock, Long sid, SendWorker sw) {
super("RecvWorker:" + sid);
this.sid = sid;
this.sock = sock;
this.sw = sw;
try {
din = new DataInputStream(sock.getInputStream());
sock.setSoTimeout(0);
} catch (IOException e) {
LOG.error("Error while accessing socket for " + sid, e);
closeSocket(sock);
running = false;
}
}
每个SenderWorker或者RecvWorker都有一个sid变量,显然,每一个sid对应的QuorumPeer都会有与之对应的SenderWorker和RecvWorker来专门负责处理接收到的它的消息或者向它发送消息。
接下来再看下对应的run()方法做了什么操作 SendWorker.run():
pubic void run() {
threadCnt.incrementAndGet();
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
if (bq == null || isSendQueueEmpty(bq)) {
ByteBuffer b = lastMessageSent.get(sid);
if (b != null) {
LOG.debug("Attempting to send lastMessage to sid=" + sid);
send(b);
}
}
} catch (IOException e) {
LOG.error("Failed to send last message. Shutting down thread.", e);
this.finish();
}
try {
while (running && !shutdown && sock != null) {
ByteBuffer b = null;
try {
ArrayBlockingQueue<ByteBuffer> bq = queueSendMap
.get(sid);
if (bq != null) {
b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
} else {
LOG.error("No queue of incoming messages for " +
"server " + sid);
break;
}
if(b != null){
lastMessageSent.put(sid, b);
send(b);
}
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for message on queue",
e);
}
}
} catch (Exception e) {
LOG.warn("Exception when using channel: for id " + sid + " my id = " +
self.getId() + " error = " + e);
}
this.finish();
LOG.warn("Send worker leaving thread");
}
}
RecvWorker.run():
public void run() {
threadCnt.incrementAndGet();
try {
while (running && !shutdown && sock != null) {
int length = din.readInt();
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException(
"Received packet with invalid packet: "
+ length);
}
byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
ByteBuffer message = ByteBuffer.wrap(msgArray);
addToRecvQueue(new Message(message.duplicate(), sid));
}
} catch (Exception e) {
LOG.warn("Connection broken for id " + sid + ", my id = " +
self.getId() + ", error = " , e);
} finally {
LOG.warn("Interrupting SendWorker");
sw.finish();
if (sock != null) {
closeSocket(sock);
}
}
}
代码可以看到,SenderWorker负责不断从全局的queueSendMap中读取自己所负责的sid对应的消息的列表,然后将消息发送给对应的sid。
而RecvWorker负责从与自己负责的sid建立的TCP连接中读取数据放入到recvQueue的末尾。
从QuorumCnxManager.SenderWorker和QuorumCnxManager.RecvWorker的run方法中可以看出,这两个worker都是基于QuorumCnxManager建立的连接,与对应的server进行消息的发送和接收,而要发送的消息则来自FastLeaderElection,接收到的消息,也是被FastLeaderElection处理,因此,QuorumCnxManager的两个worker并不负责具体的算法实现,只是消息发送、接收的代理类,FastLeaderElection不需要理睬怎么与其它的server通信、怎么获得其它server的投票信息这些细节,只需要从QuorumCnxManager提供的队列里面取消息或者放入消息。
FastLeaderElection.Messenger.WorkerReceiver 和 FastLeaderElection.Messenger.WorkerSender 在选举过程中需要进行消息的沟通,因此在FastLeaderElection中维护了两个变量:
LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;
recvqueue中存放了选举过程中接收到的消息,这些消息被交给了FastLeaderElection的最核心方法lookForLeader()进行处理以选举出leader。而sendqueue中存放了待发送出去的消息。
同时,每一个FastLeaderElection变量维护了一个内置类Messager,Messager类包含了两个实现了Runnable接口的类WorkerReceiver和WorkerSender,从名字可以看出,这两个类分别负责消息的发送和接收。即WorkerReceiver负责接收消息并将接收到的消息放入recvqueue中等待处理,WorkerSender负责从sendqueue中取出待发送消息,交给下层的连接管理类QuorumCnxManager进行发送。
下面,我们来看看FastLeaderElection.Messager.WorkekSender和 FastLeaderElection.Messager.WorkerReceiver各自的run方法:
WorkerSender.run():很简单不说了。
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if(m == null) continue;
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
m.leader,
m.zxid,
m.electionEpoch,
m.peerEpoch);
manager.toSend(m.sid, requestBuffer);
}
WorkerReceiver.run():
public void run() {
Message response;
while (!stop) {
try{
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
if(!validVoter(response.sid)){
Vote current = self.getCurrentVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
sendqueue.offer(notmsg);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Receive new notification message. My id = "
+ self.getId());
}
if (response.buffer.capacity() < 28) {
LOG.error("Got a short response: "
+ response.buffer.capacity());
continue;
}
boolean backCompatibility = (response.buffer.capacity() == 28);
response.buffer.clear();
Notification n = new Notification();
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (response.buffer.getInt()) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
case 1:
ackstate = QuorumPeer.ServerState.FOLLOWING;
break;
case 2:
ackstate = QuorumPeer.ServerState.LEADING;
break;
case 3:
ackstate = QuorumPeer.ServerState.OBSERVING;
break;
default:
continue;
}
n.leader = response.buffer.getLong();
n.zxid = response.buffer.getLong();
n.electionEpoch = response.buffer.getLong();
n.state = ackstate;
n.sid = response.sid;
if(!backCompatibility){
n.peerEpoch = response.buffer.getLong();
} else {
if(LOG.isInfoEnabled()){
LOG.info("Backward compatibility mode, server id=" + n.sid);
}
n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
}
n.version = (response.buffer.remaining() >= 4) ?
response.buffer.getInt() : 0x0;
if(LOG.isInfoEnabled()){
printNotification(n);
}
if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
recvqueue.offer(n);
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock.get())){
Vote v = getVote();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock.get(),
self.getPeerState(),
response.sid,
v.getPeerEpoch());
sendqueue.offer(notmsg);
}
} else {
Vote current = self.getCurrentVote();
if(ackstate == QuorumPeer.ServerState.LOOKING){
if(LOG.isDebugEnabled()){
LOG.debug("Sending new notification. My id = " +
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
" leader=" + current.getId());
}
ToSend notmsg;
if(n.version > 0x0) {
notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
current.getZxid(),
current.getElectionEpoch(),
self.getPeerState(),
response.sid,
current.getPeerEpoch());
} else {
Vote bcVote = self.getBCVote();
notmsg = new ToSend(
ToSend.mType.notification,
bcVote.getId(),
bcVote.getZxid(),
bcVote.getElectionEpoch(),
self.getPeerState(),
response.sid,
bcVote.getPeerEpoch());
}
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
System.out.println("Interrupted Exception while waiting for new message" +
e.toString());
}
}
LOG.info("WorkerReceiver is down");
}
|