elasticsearch系列文章
深入elasticsearch(一):搭建elasticsearch环境及安装elasticsearch_head插件
深入elasticsearch(二):springboot通过jestClient操作es
es选主流程源码解析
es采用类Bully算法来当做主节点选举的算法,同时避免了当发生网络分区等异常情况下出现脑裂的问题。
Bully算法:
Leader选举的基本算法之一。它假定所有节点都有一个唯一的ID,使用该ID对节点进行排序。任何时候,当前的Leader都是节点中ID最高的那个。该算法实现简单,但当Leader节点网络故障或者不稳定时会有问题。比如,Master负载过重假死,集群选举第二大的ID为Leader,这时原来的Master恢复,再次被选为新主,然后再循环。。。
ES通过推迟选举,直到当前的Master失效后才重新选举,当前Master不失效,就不重新选主,但是这样容易产生脑裂,所以,又通过一个法定得票人数参数配置来解决脑裂问题。
选主流程主要位于ZenDiscovery中,主要为:
- 选举临时Master,如果本节点当选,则等待其他节点确认Master
- 如果其他节点当选,则尝试加入集群,然后启动节点失效探测。
主要源码在ZenDiscovery#innerJoinCluster中,选举临时Master的代码在**findMaster()**中,我们从选举临时Master开始分析
选举临时Master
结合图片顺着顺序来看代码:
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
}
final DiscoveryNode localNode = clusterService.localNode();
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, clusterService.state()));
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
activeMasters.add(pingResponse.master());
}
}
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
if (activeMasters.isEmpty()) {
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
return null;
}
} else {
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
static List<ZenPing.PingResponse> filterPingResponses(List<ZenPing.PingResponse> fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) {
List<ZenPing.PingResponse> pingResponses;
if (masterElectionIgnoreNonMasters) {
pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
} else {
pingResponses = fullPingResponses;
return pingResponses;
}
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}
临时Master的选举流程如下:
-
获取所有节点列表fullPingResponses(不包含自己),最后单独把自己加入到fullPingResponses中。 -
过滤fullPingResponses,如果discovery.zen.master_election.ignore_non_master_pings参数设置为true的则过滤掉没有候选Master资格的节点,默认是false。 -
构建两个节点列表:
- activeMasters列表:存储存活主节点列表,遍历fullPingResponses,将每个节点认为的主节点(不包含自己)加入到activeMasters中。
为什么不把 自己加入到节点列表是因为防止在没有其他节点的情况下自行选举。 - masterCandidates:把所有有候选资格的节点加入到masterCandidates中。
4.如果activeMasters为空,则从masterCandidates开始选举,选举成功返回选举好的节点,不成功返回空,如果activeMasters不为空,则直接从activeMasters选取版本号最大、id最小的节点充当临时主节点。
以下代码是当发生选举时依赖的排序规则 compare()是选举中的排序规则方法,从代码中可以看出,当存在多个候选节点数据时是根据版本号,和nodeId来进行选择的。
public static int compare(MasterCandidate c1, MasterCandidate c2) {
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
确立Master或者加入集群
当临时节点Master选举出来后, 有两个分支:
- 当前节点被选举成临时主节点
- 当前节点不是临时主节点。
当前节点被选举成临时主节点
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
nodeJoinController.startElectionContext();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
}
if (!joinThreadControl.joinThreadActive(currentThread)) {
return;
}
if (clusterService.localNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
nodesFD.updateNodesAndPing(state);
}
@Override
public void onFailure(Throwable t) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
);
} else {
}
}
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
final CountDownLatch done = new CountDownLatch(1);
final ElectionCallback wrapperCallback = new ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
done.countDown();
callback.onElectedAsMaster(state);
}
@Override
public void onFailure(Throwable t) {
done.countDown();
callback.onFailure(t);
}
};
ElectionContext myElectionContext = null;
try {
synchronized (this) {
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
checkPendingJoinsAndElectIfNeeded();
}
try {
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
return;
}
} catch (InterruptedException e) {
}
failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
} catch (Exception e) {
logger.error("unexpected failure while waiting for incoming joins", e);
if (myElectionContext != null) {
failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]");
}
}
}
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
} else {
electionContext.closeAndBecomeMaster();
electionContext = null;
}
}
当本节点被选为主节点的时候主要执行以下流程:
-
等待其他有候选资格的节点投票,满足最小投票数则成为主节点。 -
如果超时(30s)或者选举时发生异常,则放弃本轮选举,开始进行新一轮选举。 -
选举成功后,发布集群状态,清空选举容器。 -
开启节点失效探测,定期(1s)探测每个节点的状态,当节点个数不满足最小投票数的时候,放弃主节点身份。
当前节点不是主节点
nodeJoinController.stopElectionContext(masterNode + " elected");
final boolean success = joinElectedMaster(masterNode);
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new LocalClusterUpdateTask() {
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
if (!success) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return unchanged();
}
if (currentState.getNodes().getMasterNode() == null) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return unchanged();
}
if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
joinThreadControl.markThreadAsDone(currentThread);
return unchanged();
}
@Override
public void onFailure(String source, @Nullable Exception e) {
logger.error("unexpected error while trying to finalize cluster join", e);
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
});
private class NewPendingClusterStateListener implements PublishClusterStateAction.NewPendingClusterStateListener {
@Override
public void onNewClusterState(String reason) {
processNextPendingClusterState(reason);
}
}
作为从节点参与选举的这个流程相对来说简单一点,主要分为:
- 停止接收其他节点的join。
- join主节点。
- 当主节点选举完成发布集群状态的时候,通过接收集群状态来完成自己的链接。
唯一需要注意的是,在接收到集群状态的时候,会自动开启对主节点失效探测链接。
节点失效探测
节点失效探测作为触发选举和下线不可用节点必不可少的依据,在es中,分为主节点失效探测NodesFaultDetection,和从节点失效探测MasterFaultDetection。这里我们重点讲述主节点失效探测,因为主节点开启的失效探测机制中包含了现是保证ES不会脑裂的重要功能。
NodesFaultDetection是主节点发布集群状态之后,启动的失效探测实现类,以下代码省略和脑裂不相关的代码
@Override
public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
final ClusterState remainingNodesClusterState = remainingNodesClusterState(currentState, remainingNodesBuilder);
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
if (electMasterService.hasEnoughMasterNodes(remainingNodesClusterState.nodes()) == false) {
final int masterNodes = electMasterService.countMasterNodes(remainingNodesClusterState.nodes());
rejoin.accept(LoggerMessageFormat.format("not enough master nodes (has [{}], but needed [{}])",
masterNodes, electMasterService.minimumMasterNodes()));
return resultBuilder.build(currentState);
} else {
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
}
}
主节点对从节点以每秒探测一次的频率监控集群内节点情况,当某一次节点探测失败时,会触发对当前存活节点的判断,如果小于法定投票人数,则当前主节点会放弃主节点身份,进行下一轮选举。
知识谱
|