一、集群架构
1.1 Master - Slave
也就是我们说的主从复制,主机的数据更新后根据配置和策略,自动同步到备机的master/slave机制,当主节点宕机后,集群会根据某种分布式一致性协议(Raft、gossip协议、ZAB协议等)选举出新的Master节点。
Master负责读写,Slave只负责读。
在很多组件中都有使用这种思想,比如Mysql主从架构、Redis主从架构、kafka里面的数据副本机制等等。
常见的主从复制架构有:
-
一主多从 -
主主复制 -
级联复制 -
多主一从
1.2 Leader - Follower
这也是比较常见的集群架构,各自职责如下:
-
Leader:领导者,主要的工作任务有两项
- 事物请求的唯一调度和处理者,保证集群事物处理的顺序性
- 集群内部各服务器的调度者
-
Follower:跟随者,主要职责是
- 处理客户端非事物请求、转发事物请求给 Leader 服务器
- 参与事物请求的投票,如Zookeeper半数以上Follower通过才能通知 Leader commit数据
- 参与Leader选举的投票
在很多组件中都有使用这种思想,比如ZooKeeper集群等等。
二、Nacos注册中心集群架构
Nacos 注册中心集群架构采用了Leader - Follower模式,在Nacos Server服务启动过程中,会选举出一个Leader节点,如果在运行过程中,Leader节点宕机,服务会进入不可用状态,直到选举出新的Leader节点。
Nacos 提供了顶层的一致性服务类:ConsistencyService,所有的一致性算法实现,都必须实现 ConsistencyService 接口,Nacos内部根据临时节点和持久化节点,分别实现了对应两种一致性协议,整体类关系图如下: 其中,ConsistencyService 直接实现类或接口有三个:EphemeralConsistencyService、PersistentConsistencyService、DelegateConsistencyServiceImpl,其中 DelegateConsistencyServiceImpl采用了委派设计模式,内部存在EphemeralConsistencyService、PersistentConsistencyService两个成员变量,根据key判断,采用哪种一致性协议。
- EphemeralConsistencyService:临时节点一致性协议,临时数据与服务器同生共死,即只要会话仍然存在,临时数据就不会丢失。
- PersistentConsistencyService:持久化节点一致性协议,保证已发布数据的 CP 一致性的一致性协议。
DelegateConsistencyServiceImpl代码实现如下:
@Service("consistencyDelegate")
public class DelegateConsistencyServiceImpl implements ConsistencyService {
@Autowired
private PersistentConsistencyService persistentConsistencyService;
@Autowired
private EphemeralConsistencyService ephemeralConsistencyService;
@Override
public void put(String key, Record value) throws NacosException {
mapConsistencyService(key).put(key, value);
}
@Override
public void remove(String key) throws NacosException {
mapConsistencyService(key).remove(key);
}
@Override
public Datum get(String key) throws NacosException {
return mapConsistencyService(key).get(key);
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
persistentConsistencyService.listen(key, listener);
ephemeralConsistencyService.listen(key, listener);
return;
}
mapConsistencyService(key).listen(key, listener);
}
@Override
public void unlisten(String key, RecordListener listener) throws NacosException {
mapConsistencyService(key).unlisten(key, listener);
}
@Override
public boolean isAvailable() {
return ephemeralConsistencyService.isAvailable() && persistentConsistencyService.isAvailable();
}
private ConsistencyService mapConsistencyService(String key) {
return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}
}
其中 KeyBuilder.matchEphemeralKey(key) 逻辑如下:
public class KeyBuilder {
public static boolean matchEphemeralKey(String key) {
return matchEphemeralInstanceListKey(key);
}
public static boolean matchEphemeralInstanceListKey(String key) {
return key.startsWith(INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX);
}
}
DelegateConsistencyServiceImpl 类的主要作用是:判断key是否以 com.alibaba.nacos.naming.iplist.ephemeral. 开头,如果是,采用 ephemeralConsistencyService 一致性算法,否则采用 persistentConsistencyService 一致性算法。
2.1 DistroConsistencyServiceImpl
DistroConsistencyServiceImpl 为 EphemeralConsistencyService 的唯一实现,如下:
DistroConsistencyServiceImpl 临时节点一致性协议采用一种分区的一致性协议算法,该算法的特点如下:
- 将数据划分为许多块,每个 Nacos 服务器节点只负责一个数据块。
- 每个数据块都由其负责的服务器生成、删除和同步。
- 每个 Nacos 服务器只处理总服务数据的一个子集的写入。
- 每个 Nacos 服务器都会接收到其他 Nacos 服务器的数据同步。
最终每个 Nacos 服务器最终都会有一套完整的数据。
分区 / 分片 / 分段 在很多中间件或底层源码都要体现,例如Java 7中的 ConcurrentHashMap中的 Segment、Redis集群架构的Hash槽、Kafka中的数据分片、RocketMa中的读写队列等等。
DistroConsistencyServiceImpl 的类结构比较简单,如下:
2.1.1 init() 方法
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
@PostConstruct
public void init() {
GlobalExecutor.submit(new Runnable() {
@Override
public void run() {
try {
load();
} catch (Exception e) {
Loggers.DISTRO.error("load data failed.", e);
}
}
});
executor.submit(notifier);
}
}
2.1.2 load() 方法
尝试加载所有服务中的数据,缓存到本地。
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
public void load() throws Exception {
if (SystemUtils.STANDALONE_MODE) {
initialized = true;
return;
}
while (serverListManager.getHealthyServers().size() <= 1) {
Thread.sleep(1000L);
Loggers.DISTRO.info("waiting server list init...");
}
for (Server server : serverListManager.getHealthyServers()) {
if (NetUtils.localServer().equals(server.getKey())) {
continue;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync from " + server);
}
if (syncAllDataFromRemote(server)) {
initialized = true;
return;
}
}
}
public boolean syncAllDataFromRemote(Server server) {
try {
byte[] data = NamingProxy.getAllData(server.getKey());
processData(data);
return true;
} catch (Exception e) {
Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
return false;
}
}
public void processData(byte[] data) throws Exception {
if (data.length > 0) {
Map<String, Datum<Instances>> datumMap =
serializer.deserializeMap(data, Instances.class);
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
dataStore.put(entry.getKey(), entry.getValue());
if (!listeners.containsKey(entry.getKey())) {
if (switchDomain.isDefaultInstanceEphemeral()) {
Loggers.DISTRO.info("creating service {}", entry.getKey());
Service service = new Service();
String serviceName = KeyBuilder.getServiceName(entry.getKey());
String namespaceId = KeyBuilder.getNamespace(entry.getKey());
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(Constants.DEFAULT_GROUP);
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
.onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
}
}
}
for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) {
if (!listeners.containsKey(entry.getKey())) {
Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
continue;
}
try {
for (RecordListener listener : listeners.get(entry.getKey())) {
listener.onChange(entry.getKey(), entry.getValue().value);
}
} catch (Exception e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
continue;
}
dataStore.put(entry.getKey(), entry.getValue());
}
}
}
}
2.1.3 Notifier 线程(事件监听)
Notifier 为 DistroConsistencyServiceImpl 的一个内部类,用于实现事件监听机制,完成数据变更/删除通知所有Lisenter。
Notifier 采用了生产者消费者模式,通过 BlockingQueue<Pair> tasks 阻塞队列,完成生产 / 消费。
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.DISTRO.info("distro notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
int count = 0;
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
datumKey, count, action.name());
}
} catch (Throwable e) {
Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
}
2.1.4 put(String key, Record value) 方法
我们主要看一下其中的事务处理方法,以 put(String key, Record value) 方法为例,其他的(如remove(String key) ) 类似,这里不做过多分析。
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {
@Override
public void put(String key, Record value) throws NacosException {
onPut(key, value);
taskDispatcher.addTask(key);
}
public void onPut(String key, Record value) {
if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
Datum<Instances> datum = new Datum<>();
datum.value = (Instances) value;
datum.key = key;
datum.timestamp.incrementAndGet();
dataStore.put(key, datum);
}
if (!listeners.containsKey(key)) {
return;
}
notifier.addTask(key, ApplyAction.CHANGE);
}
}
2.1.5 TaskDispatcher
数据同步任务调度器。
@Component
public class TaskDispatcher {
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private DataSyncer dataSyncer;
private List<TaskScheduler> taskSchedulerList = new ArrayList<>();
private final int cpuCoreCount = Runtime.getRuntime().availableProcessors();
@PostConstruct
public void init() {
for (int i = 0; i < cpuCoreCount; i++) {
TaskScheduler taskScheduler = new TaskScheduler(i);
taskSchedulerList.add(taskScheduler);
GlobalExecutor.submitTaskDispatch(taskScheduler);
}
}
public void addTask(String key) {
taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key);
}
public class TaskScheduler implements Runnable {
private int index;
private int dataSize = 0;
private long lastDispatchTime = 0L;
private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024);
public TaskScheduler(int index) {
this.index = index;
}
public void addTask(String key) {
queue.offer(key);
}
public int getIndex() {
return index;
}
@Override
public void run() {
List<String> keys = new ArrayList<>();
while (true) {
try {
String key = queue.poll(partitionConfig.getTaskDispatchPeriod(),
TimeUnit.MILLISECONDS);
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("got key: {}", key);
}
if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) {
continue;
}
if (StringUtils.isBlank(key)) {
continue;
}
if (dataSize == 0) {
keys = new ArrayList<>();
}
keys.add(key);
dataSize++;
if (dataSize == partitionConfig.getBatchSyncKeyCount() ||
(System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) {
for (Server member : dataSyncer.getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
SyncTask syncTask = new SyncTask();
syncTask.setKeys(keys);
syncTask.setTargetServer(member.getKey());
if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) {
Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask));
}
dataSyncer.submit(syncTask, 0);
}
lastDispatchTime = System.currentTimeMillis();
dataSize = 0;
}
} catch (Exception e) {
Loggers.DISTRO.error("dispatch sync task failed.", e);
}
}
}
}
}
2.1.6 DataSyncer
数据同步处理器。
@Component
@DependsOn("serverListManager")
public class DataSyncer {
@Autowired
private DataStore dataStore;
@Autowired
private GlobalConfig partitionConfig;
@Autowired
private Serializer serializer;
@Autowired
private DistroMapper distroMapper;
@Autowired
private ServerListManager serverListManager;
private Map<String, String> taskMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
startTimedSync();
}
public void submit(SyncTask task, long delay) {
if (task.getRetryCount() == 0) {
Iterator<String> iterator = task.getKeys().iterator();
while (iterator.hasNext()) {
String key = iterator.next();
if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync already in process, key: {}", key);
}
iterator.remove();
}
}
}
if (task.getKeys().isEmpty()) {
return;
}
GlobalExecutor.submitDataSync(new Runnable() {
@Override
public void run() {
try {
if (getServers() == null || getServers().isEmpty()) {
Loggers.SRV_LOG.warn("try to sync data but server list is empty.");
return;
}
List<String> keys = task.getKeys();
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync keys: {}", keys);
}
Map<String, Datum> datumMap = dataStore.batchGet(keys);
if (datumMap == null || datumMap.isEmpty()) {
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
return;
}
byte[] data = serializer.serialize(datumMap);
long timestamp = System.currentTimeMillis();
boolean success = NamingProxy.syncData(data, task.getTargetServer());
if (!success) {
SyncTask syncTask = new SyncTask();
syncTask.setKeys(task.getKeys());
syncTask.setRetryCount(task.getRetryCount() + 1);
syncTask.setLastExecuteTime(timestamp);
syncTask.setTargetServer(task.getTargetServer());
retrySync(syncTask);
} else {
for (String key : task.getKeys()) {
taskMap.remove(buildKey(key, task.getTargetServer()));
}
}
} catch (Exception e) {
Loggers.DISTRO.error("sync data failed.", e);
}
}
}, delay);
}
public void retrySync(SyncTask syncTask) {
Server server = new Server();
server.setIp(syncTask.getTargetServer().split(":")[0]);
server.setServePort(Integer.parseInt(syncTask.getTargetServer().split(":")[1]));
if (!getServers().contains(server)) {
return;
}
submit(syncTask, partitionConfig.getSyncRetryDelay());
}
public void startTimedSync() {
GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync());
}
public class TimedSync implements Runnable {
@Override
public void run() {
try {
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("server list is: {}", getServers());
}
Map<String, String> keyChecksums = new HashMap<>(64);
for (String key : dataStore.keys()) {
if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) {
continue;
}
keyChecksums.put(key, dataStore.get(key).value.getChecksum());
}
if (keyChecksums.isEmpty()) {
return;
}
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("sync checksums: {}", keyChecksums);
}
for (Server member : getServers()) {
if (NetUtils.localServer().equals(member.getKey())) {
continue;
}
NamingProxy.syncCheckSums(keyChecksums, member.getKey());
}
} catch (Exception e) {
Loggers.DISTRO.error("timed sync task failed.", e);
}
}
}
public List<Server> getServers() {
return serverListManager.getHealthyServers();
}
public String buildKey(String key, String targetServer) {
return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer;
}
}
2.1.7 Nacos 分区一致性算法总结
2.2 RaftConsistencyServiceImpl
RaftConsistencyServiceImpl 为 PersistentConsistencyService 的唯一实现,如下:
RaftConsistencyServiceImpl 持久化节点一致性协议采用 Raft算法 实现, 保证已发布数据的 CP 一致性的一致性协议。
2.2.1 Raft算法
Raft 算法主要作用:
- 服务启动时,选举出集群中的 Leader节点
- 崩溃恢复,Leader节点挂掉后,从所有 Follower 节点中选举出新的 Leader
- 数据一致性,Leader 节点处理事务请求,Follower 只处理非事务请求,如果接受到事务请求,将当前请求转发给Leader,由Leader处理后,同步给Follower节点。
Raft算法的Leader选举核心思想:先到先得,少数服从多数。
下面简单说明一下 Raft 算法在服务启动时,选举 Leader 的过程,其中 Node 代表节点,Term 代表选举周期。
周期的含义:服务每选举一轮,为一个周期
1.当所有服务启动后,每个服务会随机生成一个 Leader 选举开始倒计时(下图红色框框中的部分)。 2.当集群中存在倒计时为0的节点时,它会进入一个候选状态,例如下面的Node B节点。 3.候选节点会首先发起Leader投票(先到先得) 4.其他节点接收到投票消息后,会返回一个信息给候选节点。 5.当某个候选节点,收到Leader选举票数大于当前集群总节点的 1/2 时,它会变成 Leader 节点,其他节点变成 Follower 节点(少数服从多数)。 6. Leader节点需要每隔一段时间,向 Follower 发送一个心跳包,维持心跳。 7. Follower 节点会开启一个心跳倒计时,在倒计时时间内,如果收到心跳,返回给Leader,并重新开始倒计时;如果倒计时时间内,没有收到心跳,会主动向 Leade r节点发送请求,判断当前 Leader 是否宕机,如果宕机,开始新的 Leader 选举。 当集群中存在 Leader 节点宕机后,还是会重复上面 1-7 类似步骤,选举出新的 Leader 节点。
2.2.2 Nacos中Raft算法的具体实现
RaftConsistencyServiceImpl 持久化节点一致性协议采用 Raft算法 实现, 保证已发布数据的 CP 一致性的一致性协议。
核心逻辑如下:
@Service
public class RaftConsistencyServiceImpl implements PersistentConsistencyService {
@Autowired
private RaftCore raftCore;
@Autowired
private RaftPeerSet peers;
@Autowired
private SwitchDomain switchDomain;
@Override
public void put(String key, Record value) throws NacosException {
try {
raftCore.signalPublish(key, value);
} catch (Exception e) {
Loggers.RAFT.error("Raft put failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value, e);
}
}
@Override
public void remove(String key) throws NacosException {
try {
if (KeyBuilder.matchInstanceListKey(key) && !raftCore.isLeader()) {
Datum datum = new Datum();
datum.key = key;
raftCore.onDelete(datum.key, peers.getLeader());
raftCore.unlistenAll(key);
return;
}
raftCore.signalDelete(key);
raftCore.unlistenAll(key);
} catch (Exception e) {
Loggers.RAFT.error("Raft remove failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft remove failed, key:" + key, e);
}
}
@Override
public Datum get(String key) throws NacosException {
return raftCore.getDatum(key);
}
@Override
public void listen(String key, RecordListener listener) throws NacosException {
raftCore.listen(key, listener);
}
@Override
public void unlisten(String key, RecordListener listener) throws NacosException {
raftCore.unlisten(key, listener);
}
@Override
public boolean isAvailable() {
return raftCore.isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());
}
public void onPut(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onPublish(datum, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onPut failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft onPut failed, datum:" + datum + ", source: " + source, e);
}
}
public void onRemove(Datum datum, RaftPeer source) throws NacosException {
try {
raftCore.onDelete(datum.key, source);
} catch (Exception e) {
Loggers.RAFT.error("Raft onRemove failed.", e);
throw new NacosException(NacosException.SERVER_ERROR, "Raft onRemove failed, datum:" + datum + ", source: " + source, e);
}
}
}
可以看出,Raft的核心实现有两个类,一个是RaftCore,一个是RaftPeerSet,作用分别是:
- RaftPeerSet:Raft Leader选举核心实现
- RaftCore:Raft 核心实现
2.2.2.1 RaftPeerSet(Leader选举)
该类主要职责是进行Raft Leader选举,主要包括两个场景:
- 服务启动过程中 Leader 选举;
- Leader 出现宕机后,选举出新的 Leader;
RaftPeerSet类关系图如下:
- ServerChangeListener:监听服务列表变化
- @DependsOn(“serverListManager”):在进行注入的时候,先加载serverListManager后,再加载当前实例
- ApplicationContextAware:获取 ApplicationContext 对象
- @Component:注入到Spring IoC容器中
下面我们具体看一下代码实现。
@Component
@DependsOn("serverListManager")
public class RaftPeerSet implements ServerChangeListener, ApplicationContextAware {
@Autowired
private ServerListManager serverListManager;
private ApplicationContext applicationContext;
private AtomicLong localTerm = new AtomicLong(0L);
private RaftPeer leader = null;
private Map<String, RaftPeer> peers = new HashMap<>();
private Set<String> sites = new HashSet<>();
private boolean ready = false;
public RaftPeerSet() {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@PostConstruct
public void init() {
serverListManager.listen(this);
}
public RaftPeer getLeader() {
if (STANDALONE_MODE) {
return local();
}
return leader;
}
public Set<String> allSites() {
return sites;
}
public boolean isReady() {
return ready;
}
public void remove(List<String> servers) {
for (String server : servers) {
peers.remove(server);
}
}
public RaftPeer update(RaftPeer peer) {
peers.put(peer.ip, peer);
return peer;
}
public boolean isLeader(String ip) {
if (STANDALONE_MODE) {
return true;
}
if (leader == null) {
Loggers.RAFT.warn("[IS LEADER] no leader is available now!");
return false;
}
return StringUtils.equals(leader.ip, ip);
}
public Set<String> allServersIncludeMyself() {
return peers.keySet();
}
public Set<String> allServersWithoutMySelf() {
Set<String> servers = new HashSet<String>(peers.keySet());
servers.remove(local().ip);
return servers;
}
public Collection<RaftPeer> allPeers() {
return peers.values();
}
public int size() {
return peers.size();
}
public RaftPeer decideLeader(RaftPeer candidate) {
peers.put(candidate.ip, candidate);
SortedBag ips = new TreeBag();
int maxApproveCount = 0;
String maxApprovePeer = null;
for (RaftPeer peer : peers.values()) {
if (StringUtils.isEmpty(peer.voteFor)) {
continue;
}
ips.add(peer.voteFor);
if (ips.getCount(peer.voteFor) > maxApproveCount) {
maxApproveCount = ips.getCount(peer.voteFor);
maxApprovePeer = peer.voteFor;
}
}
if (maxApproveCount >= majorityCount()) {
RaftPeer peer = peers.get(maxApprovePeer);
peer.state = RaftPeer.State.LEADER;
if (!Objects.equals(leader, peer)) {
leader = peer;
applicationContext.publishEvent(new LeaderElectFinishedEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER", leader.ip);
}
}
return leader;
}
public RaftPeer makeLeader(RaftPeer candidate) {
if (!Objects.equals(leader, candidate)) {
leader = candidate;
applicationContext.publishEvent(new MakeLeaderEvent(this, leader));
Loggers.RAFT.info("{} has become the LEADER, local: {}, leader: {}",
leader.ip, JSON.toJSONString(local()), JSON.toJSONString(leader));
}
for (final RaftPeer peer : peers.values()) {
Map<String, String> params = new HashMap<>(1);
if (!Objects.equals(peer, candidate) && peer.state == RaftPeer.State.LEADER) {
try {
String url = RaftCore.buildURL(peer.ip, RaftCore.API_GET_PEER);
HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("[NACOS-RAFT] get peer failed: {}, peer: {}",
response.getResponseBody(), peer.ip);
peer.state = RaftPeer.State.FOLLOWER;
return 1;
}
update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
return 0;
}
});
} catch (Exception e) {
peer.state = RaftPeer.State.FOLLOWER;
Loggers.RAFT.error("[NACOS-RAFT] error while getting peer from peer: {}", peer.ip);
}
}
}
return update(candidate);
}
public RaftPeer local() {
RaftPeer peer = peers.get(NetUtils.localServer());
if (peer == null && SystemUtils.STANDALONE_MODE) {
RaftPeer localPeer = new RaftPeer();
localPeer.ip = NetUtils.localServer();
localPeer.term.set(localTerm.get());
peers.put(localPeer.ip, localPeer);
return localPeer;
}
if (peer == null) {
throw new IllegalStateException("unable to find local peer: " + NetUtils.localServer() + ", all peers: "
+ Arrays.toString(peers.keySet().toArray()));
}
return peer;
}
public RaftPeer get(String server) {
return peers.get(server);
}
public int majorityCount() {
return peers.size() / 2 + 1;
}
public void reset() {
leader = null;
for (RaftPeer peer : peers.values()) {
peer.voteFor = null;
}
}
public void setTerm(long term) {
localTerm.set(term);
}
public long getTerm() {
return localTerm.get();
}
public boolean contains(RaftPeer remote) {
return peers.containsKey(remote.ip);
}
@Override
public void onChangeServerList(List<Server> latestMembers) {
Map<String, RaftPeer> tmpPeers = new HashMap<>(8);
for (Server member : latestMembers) {
if (peers.containsKey(member.getKey())) {
tmpPeers.put(member.getKey(), peers.get(member.getKey()));
continue;
}
RaftPeer raftPeer = new RaftPeer();
raftPeer.ip = member.getKey();
if (NetUtils.localServer().equals(member.getKey())) {
raftPeer.term.set(localTerm.get());
}
tmpPeers.put(member.getKey(), raftPeer);
}
peers = tmpPeers;
if (RunningConfig.getServerPort() > 0) {
ready = true;
}
Loggers.RAFT.info("raft peers changed: " + latestMembers);
}
@Override
public void onChangeHealthyServerList(List<Server> latestReachableMembers) {
}
}
2.2.2.2 RaftCore
Nacos集群中Raft算法的核心实现,包括Leader选举、事务转发Leader等功能。下面会讲解一下其中的核心逻辑。
1. RaftCore init()
@Component
public class RaftCore {
@PostConstruct
public void init() throws Exception {
Loggers.RAFT.info("initializing Raft sub-system");
executor.submit(notifier);
long start = System.currentTimeMillis();
raftStore.loadDatums(notifier, datums);
setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));
Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());
while (true) {
if (notifier.tasks.size() <= 0) {
break;
}
Thread.sleep(1000L);
}
initialized = true;
Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));
GlobalExecutor.registerMasterElection(new MasterElection());
GlobalExecutor.registerHeartbeat(new HeartBeat());
Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",
GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}
}
2. Notifier 线程(事件监听)
Notifier 为 RaftCore 的一个内部类,用于实现事件监听机制,完成数据变更/删除通知所有Lisenter。
Notifier 采用了生产者消费者模式,通过 BlockingQueue<Pair> tasks 阻塞队列,完成生产 / 消费。
public class RaftCore {
private volatile Map<String, List<RecordListener>> listeners = new ConcurrentHashMap<>();
public class Notifier implements Runnable {
private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<>(1024 * 1024);
public void addTask(String datumKey, ApplyAction action) {
if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
Loggers.RAFT.info("add task {}", datumKey);
tasks.add(Pair.with(datumKey, action));
}
public int getTaskSize() {
return tasks.size();
}
@Override
public void run() {
Loggers.RAFT.info("raft notifier started");
while (true) {
try {
Pair pair = tasks.take();
if (pair == null) {
continue;
}
String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();
services.remove(datumKey);
Loggers.RAFT.info("remove task {}", datumKey);
int count = 0;
if (listeners.containsKey(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
if (KeyBuilder.matchServiceMetaKey(datumKey) && !KeyBuilder.matchSwitchKey(datumKey)) {
for (RecordListener listener : listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX)) {
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, getDatum(datumKey).value);
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", datumKey, e);
}
}
}
}
if (!listeners.containsKey(datumKey)) {
continue;
}
for (RecordListener listener : listeners.get(datumKey)) {
count++;
try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, getDatum(datumKey).value);
continue;
}
if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] error while notifying listener of key: {}", datumKey, e);
}
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[NACOS-RAFT] datum change notified, key: {}, listener count: {}", datumKey, count);
}
} catch (Throwable e) {
Loggers.RAFT.error("[NACOS-RAFT] Error while handling notifying task", e);
}
}
}
}
}
3. MasterElection 线程(投票选举)
用于选举出Nacos集群中的Leader节点。
每隔 500L 执行一次。
public class RaftCore {
public class MasterElection implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.leaderDueMs > 0) {
return;
}
local.resetLeaderDue();
local.resetHeartbeatDue();
sendVote();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while master election {}", e);
}
}
public void sendVote() {
RaftPeer local = peers.get(NetUtils.localServer());
Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}",
JSON.toJSONString(getLeader()), local.term);
peers.reset();
local.term.incrementAndGet();
local.voteFor = local.ip;
local.state = RaftPeer.State.CANDIDATE;
Map<String, String> params = new HashMap<>(1);
params.put("vote", JSON.toJSONString(local));
for (final String server : peers.allServersWithoutMySelf()) {
final String url = buildURL(server, API_VOTE);
try {
HttpClient.asyncHttpPost(url, null, params, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", response.getResponseBody(), url);
return 1;
}
RaftPeer peer = JSON.parseObject(response.getResponseBody(), RaftPeer.class);
Loggers.RAFT.info("received approve from peer: {}", JSON.toJSONString(peer));
peers.decideLeader(peer);
return 0;
}
});
} catch (Exception e) {
Loggers.RAFT.warn("error while sending vote to server: {}", server);
}
}
}
}
}
在 MasterElection 线程中,会向其他节点发送一个投票请求(/v1/ns/raft/vote),服务收到请求后,最终会调用 RaftCore 中的 RaftPeer receivedVote(RaftPeer remote) 方法,将当前服务节点投票结果发送给请求节点,具体处理逻辑如下:
@Component
public class RaftCore {
public RaftPeer receivedVote(RaftPeer remote) {
if (!peers.contains(remote)) {
throw new IllegalStateException("can not find peer: " + remote.ip);
}
RaftPeer local = peers.get(NetUtils.localServer());
if (remote.term.get() <= local.term.get()) {
String msg = "received illegitimate vote" +
", voter-term:" + remote.term + ", votee-term:" + local.term;
Loggers.RAFT.info(msg);
if (StringUtils.isEmpty(local.voteFor)) {
local.voteFor = local.ip;
}
return local;
}
local.resetLeaderDue();
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
local.term.set(remote.term.get());
Loggers.RAFT.info("vote {} as leader, term: {}", remote.ip, remote.term);
return local;
}
}
4. HeartBeat 线程(心跳检测)
HeartBeat 用于Nacos集群中,Leader 节点发送心跳包给 Follower 节点,维持心跳。
@Component
public class RaftCore {
public class HeartBeat implements Runnable {
@Override
public void run() {
try {
if (!peers.isReady()) {
return;
}
RaftPeer local = peers.local();
local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;
if (local.heartbeatDueMs > 0) {
return;
}
local.resetHeartbeatDue();
sendBeat();
} catch (Exception e) {
Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);
}
}
public void sendBeat() throws IOException, InterruptedException {
RaftPeer local = peers.local();
if (local.state != RaftPeer.State.LEADER && !STANDALONE_MODE) {
return;
}
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());
}
local.resetLeaderDue();
JSONObject packet = new JSONObject();
packet.put("peer", local);
JSONArray array = new JSONArray();
if (switchDomain.isSendBeatOnly()) {
Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", String.valueOf(switchDomain.isSendBeatOnly()));
}
if (!switchDomain.isSendBeatOnly()) {
for (Datum datum : datums.values()) {
JSONObject element = new JSONObject();
if (KeyBuilder.matchServiceMetaKey(datum.key)) {
element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));
} else if (KeyBuilder.matchInstanceListKey(datum.key)) {
element.put("key", KeyBuilder.briefInstanceListkey(datum.key));
}
element.put("timestamp", datum.timestamp);
array.add(element);
}
}
packet.put("datums", array);
Map<String, String> params = new HashMap<String, String>(1);
params.put("beat", JSON.toJSONString(packet));
String content = JSON.toJSONString(params);
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out);
gzip.write(content.getBytes(StandardCharsets.UTF_8));
gzip.close();
byte[] compressedBytes = out.toByteArray();
String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}",
content.length(), compressedContent.length());
}
for (final String server : peers.allServersWithoutMySelf()) {
try {
final String url = buildURL(server, API_BEAT);
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("send beat to server " + server);
}
HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}",
response.getResponseBody(), server);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
return 1;
}
peers.update(JSON.parseObject(response.getResponseBody(), RaftPeer.class));
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("receive beat response from: {}", url);
}
return 0;
}
@Override
public void onThrowable(Throwable t) {
Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server, t);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
});
} catch (Exception e) {
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
}
}
}
在 HeartBeat 线程中,Leader节点会向其他Follower节点发送一个心跳包(/v1/ns/raft/vote),Follower收到心跳请求后,最终会调用 RaftCore 中的 RaftPeer receivedBeat(JSONObject beat) 方法,具体处理逻辑如下:
@Component
public class RaftCore {
public RaftPeer receivedBeat(JSONObject beat) throws Exception {
final RaftPeer local = peers.local();
final RaftPeer remote = new RaftPeer();
remote.ip = beat.getJSONObject("peer").getString("ip");
remote.state = RaftPeer.State.valueOf(beat.getJSONObject("peer").getString("state"));
remote.term.set(beat.getJSONObject("peer").getLongValue("term"));
remote.heartbeatDueMs = beat.getJSONObject("peer").getLongValue("heartbeatDueMs");
remote.leaderDueMs = beat.getJSONObject("peer").getLongValue("leaderDueMs");
remote.voteFor = beat.getJSONObject("peer").getString("voteFor");
if (remote.state != RaftPeer.State.LEADER) {
Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}",
remote.state, JSON.toJSONString(remote));
throw new IllegalArgumentException("invalid state from master, state: " + remote.state);
}
if (local.term.get() > remote.term.get()) {
Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}"
, remote.term.get(), local.term.get(), JSON.toJSONString(remote), local.leaderDueMs);
throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get()
+ ", beat-to-term: " + local.term.get());
}
if (local.state != RaftPeer.State.FOLLOWER) {
Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JSON.toJSONString(remote));
local.state = RaftPeer.State.FOLLOWER;
local.voteFor = remote.ip;
}
final JSONArray beatDatums = beat.getJSONArray("datums");
local.resetLeaderDue();
local.resetHeartbeatDue();
peers.makeLeader(remote);
Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());
for (Map.Entry<String, Datum> entry : datums.entrySet()) {
receivedKeysMap.put(entry.getKey(), 0);
}
List<String> batch = new ArrayList<>();
if (!switchDomain.isSendBeatOnly()) {
int processedCount = 0;
if (Loggers.RAFT.isDebugEnabled()) {
Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",
beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);
}
for (Object object : beatDatums) {
processedCount = processedCount + 1;
JSONObject entry = (JSONObject) object;
String key = entry.getString("key");
final String datumKey;
if (KeyBuilder.matchServiceMetaKey(key)) {
datumKey = KeyBuilder.detailServiceMetaKey(key);
} else if (KeyBuilder.matchInstanceListKey(key)) {
datumKey = KeyBuilder.detailInstanceListkey(key);
} else {
continue;
}
long timestamp = entry.getLong("timestamp");
receivedKeysMap.put(datumKey, 1);
try {
if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp && processedCount < beatDatums.size()) {
continue;
}
if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {
batch.add(datumKey);
}
if (batch.size() < 50 && processedCount < beatDatums.size()) {
continue;
}
String keys = StringUtils.join(batch, ",");
if (batch.size() <= 0) {
continue;
}
Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}, datums' size is {}, RaftCore.datums' size is {}"
, getLeader().ip, batch.size(), processedCount, beatDatums.size(), datums.size());
String url = buildURL(remote.ip, API_GET) + "?keys=" + URLEncoder.encode(keys, "UTF-8");
HttpClient.asyncHttpGet(url, null, null, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
List<JSONObject> datumList = JSON.parseObject(response.getResponseBody(), new TypeReference<List<JSONObject>>() {
});
for (JSONObject datumJson : datumList) {
OPERATE_LOCK.lock();
Datum newDatum = null;
try {
Datum oldDatum = getDatum(datumJson.getString("key"));
if (oldDatum != null && datumJson.getLongValue("timestamp") <= oldDatum.timestamp.get()) {
Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",
datumJson.getString("key"), datumJson.getLongValue("timestamp"), oldDatum.timestamp);
continue;
}
if (KeyBuilder.matchServiceMetaKey(datumJson.getString("key"))) {
Datum<Service> serviceDatum = new Datum<>();
serviceDatum.key = datumJson.getString("key");
serviceDatum.timestamp.set(datumJson.getLongValue("timestamp"));
serviceDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Service.class);
newDatum = serviceDatum;
}
if (KeyBuilder.matchInstanceListKey(datumJson.getString("key"))) {
Datum<Instances> instancesDatum = new Datum<>();
instancesDatum.key = datumJson.getString("key");
instancesDatum.timestamp.set(datumJson.getLongValue("timestamp"));
instancesDatum.value =
JSON.parseObject(JSON.toJSONString(datumJson.getJSONObject("value")), Instances.class);
newDatum = instancesDatum;
}
if (newDatum == null || newDatum.value == null) {
Loggers.RAFT.error("receive null datum: {}", datumJson);
continue;
}
raftStore.write(newDatum);
datums.put(newDatum.key, newDatum);
notifier.addTask(newDatum.key, ApplyAction.CHANGE);
local.resetLeaderDue();
if (local.term.get() + 100 > remote.term.get()) {
getLeader().term.set(remote.term.get());
local.term.set(getLeader().term.get());
} else {
local.term.addAndGet(100);
}
raftStore.updateTerm(local.term.get());
Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",
newDatum.key, newDatum.timestamp, JSON.toJSONString(remote), local.term);
} catch (Throwable e) {
Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum, e);
} finally {
OPERATE_LOCK.unlock();
}
}
TimeUnit.MILLISECONDS.sleep(200);
return 0;
}
});
batch.clear();
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);
}
}
List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {
if (entry.getValue() == 0) {
deadKeys.add(entry.getKey());
}
}
for (String deadKey : deadKeys) {
try {
deleteDatum(deadKey);
} catch (Exception e) {
Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);
}
}
}
return local;
}
}
5. RaftCore.signalPublish()
RaftCore 中的 signalPublish(String key, Record value) 方法,负责处理事务请求:
- 先判断当前节点是否为Leader节点
- 如果为Leader节点,直接处理(需集群节点半数处理完成,才算成功)
- 否则,转发给Leader节点,进行处理
@Component
public class RaftCore {
public void signalPublish(String key, Record value) throws Exception {
if (!isLeader()) {
JSONObject params = new JSONObject();
params.put("key", key);
params.put("value", value);
Map<String, String> parameters = new HashMap<>(1);
parameters.put("key", key);
raftProxy.proxyPostLarge(getLeader().ip, API_PUB, params.toJSONString(), parameters);
return;
}
try {
OPERATE_LOCK.lock();
long start = System.currentTimeMillis();
final Datum datum = new Datum();
datum.key = key;
datum.value = value;
if (getDatum(key) == null) {
datum.timestamp.set(1L);
} else {
datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());
}
JSONObject json = new JSONObject();
json.put("datum", datum);
json.put("source", peers.local());
onPublish(datum, peers.local());
final String content = JSON.toJSONString(json);
final CountDownLatch latch = new CountDownLatch(peers.majorityCount());
for (final String server : peers.allServersIncludeMyself()) {
if (isLeader(server)) {
latch.countDown();
continue;
}
final String url = buildURL(server, API_ON_PUB);
HttpClient.asyncHttpPostLarge(url, Arrays.asList("key=" + key), content, new AsyncCompletionHandler<Integer>() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",
datum.key, server, response.getStatusCode());
return 1;
}
latch.countDown();
return 0;
}
@Override
public STATE onContentWriteCompleted() {
return STATE.CONTINUE;
}
});
}
if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {
Loggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);
throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);
}
long end = System.currentTimeMillis();
Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);
} finally {
OPERATE_LOCK.unlock();
}
}
}
2.2.3 Nacos Raft算法总结
|