注:文章为博主自用版
nacos
nacos原理简述
nacos优势(自理解):注册中心+配置中心(比如阿波罗这种)+自带负载均衡,功能比较齐全,且阿里系中间件 经历了双十一的考验,国产,文档易读且社区活跃。
注册中心原理: 配置注册到注册中心(registerInstance方法),注册中心得到了客户端地址和实例名,通过心跳检测机制,知道是否存活,如果一段时间内发现不存在心跳 则认定不存在 踢下线。
ribbon: nacos集成了ribbon , 原理是有一个拦截器,将实例名解析成网址,在本地缓存。
nacos一致性协议: Distro协议是Nacos社区自研的一种AP分布式协议,是面向临时实例设计的一种分布式协议,其保证在某些Nacos节点宕机后,整个临时实例处理系统依旧可以正常工作。作为一种有状态的中间件应用内嵌协议,Distro保证了各个Nacos节点对于注册请求的统一协调和储存。
Distro协议的主要设计思想如下:
- Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
- 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
- 每个节点独立处理读请求,及时从本地发出响应。
新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机器发送请求拉取全量数据。 在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数据。
在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在?个较低水平)。这种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起?次数据校验请求。
?旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不?致,则会发起?次全量拉取请求,将数据补齐。
nacos原理源码分析
AP模式集群数据同步源码分析
全量数据同步
首先我们需要先找到DistroProtocol类型,它就是Distro协议的实现,然后在它的构造方法中,启动了一个startDistroTask()任务,其中包括了初始化同步任务 startLoadTask()
private void startDistroTask() {
if (EnvUtil.getStandaloneMode()) {
isInitialized = true;
return;
}
startVerifyTask();
startLoadTask();
}
startLoadTask()数据加载任务创建了一个DistroLoadDataTask任务,并传入了一个修改当前节点Distro协议完成状态的回调函数。
private void startLoadTask() {
DistroCallback loadCallback = new DistroCallback() {
@Override
public void onSuccess() {
isInitialized = true;
}
@Override
public void onFailed(Throwable throwable) {
isInitialized = false;
}
};
GlobalExecutor.submitLoadDataTask(
new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}
接下来我们需要查看load方法,这里判断了几种情况,其中loadAllDataSnapshotFromRemote(读取所有远程的数据快照)
private void load() throws Exception {
while (memberManager.allMembersWithoutSelf().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
TimeUnit.SECONDS.sleep(1);
}
while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
TimeUnit.SECONDS.sleep(1);
}
for (String each : distroComponentHolder.getDataStorageTypes()) {
if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
}
}
}
调用loadAllDataSnapshotFromRemote(each)方法获取同步数据,从其他节点获取同步数据,使用DistroTransportAgent获取数据,使用DistroDataProcessor来处理数据。
private boolean loadAllDataSnapshotFromRemote(String resourceType) {
DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
if (null == transportAgent || null == dataProcessor) {
Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
resourceType, transportAgent, dataProcessor);
return false;
}
for (Member each : memberManager.allMembersWithoutSelf()) {
try {
Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
boolean result = dataProcessor.processSnapshot(distroData);
Loggers.DISTRO
.info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
result);
if (result) {
distroComponentHolder.findDataStorage(resourceType).finishInitial();
return true;
}
} catch (Exception e) {
Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
}
}
return false;
}
这里我们要查看一下如何获取的数据,使用DistroTransportAgent获取数据getDatumSnapshot()方法,看完这个方法我们还要看一下如何处理数据。
@Override
public DistroData getDatumSnapshot(String targetServer) {
Member member = memberManager.find(targetServer);
if (checkTargetServerStatusUnhealthy(member)) {
throw new DistroException(
String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
}
DistroDataRequest request = new DistroDataRequest();
request.setDataOperation(DataOperation.SNAPSHOT);
try {
Response response = clusterRpcClientProxy.sendRequest(member, request);
if (checkResponse(response)) {
return ((DistroDataResponse) response).getDistroData();
} else {
throw new DistroException(
String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
targetServer, response.getErrorCode(), response.getMessage()));
}
} catch (NacosException e) {
throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
}
}
使用DistroDataProcessor来处理数据processSnapshot(distroData)方法
@Override
public boolean processSnapshot(DistroData distroData) {
ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class)
.deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
for (ClientSyncData each : snapshot.getClientSyncDataList()) {
handlerClientSyncData(each);
}
return true;
}
具体处理数据方法handlerClientSyncData()
private void handlerClientSyncData(ClientSyncData clientSyncData) {
Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
Client client = clientManager.getClient(clientSyncData.getClientId());
upgradeClient(client, clientSyncData);
}
这里的核心点就是upgradeClient方法,此方法的目的就是同步各个节点的数据
private void upgradeClient(Client client, ClientSyncData clientSyncData) {
List<String> namespaces = clientSyncData.getNamespaces();
List<String> groupNames = clientSyncData.getGroupNames();
List<String> serviceNames = clientSyncData.getServiceNames();
List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
Set<Service> syncedService = new HashSet<>();
for (int i = 0; i < namespaces.size(); i++) {
Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
Service singleton = ServiceManager.getInstance().getSingleton(service);
syncedService.add(singleton);
InstancePublishInfo instancePublishInfo = instances.get(i);
if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
client.addServiceInstance(singleton, instancePublishInfo);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
}
}
for (Service each : client.getAllPublishedService()) {
if (!syncedService.contains(each)) {
client.removeServiceInstance(each);
NotifyCenter.publishEvent(
new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
}
}
}
到这里就完成了数据初始化同步,Nacos每台机器上都维护了当前所有注册上来的非持久化实例数据。
但是各位要注意的是,这只是一个新的Nacos节点上线的同步数据操作,那么如果某个注册的客户端节点改变那,那么所有的Nacos节点都是需要数据同步的
增量数据同步
数据完成初始化后,节点的数据发生变化后需要将增量数据同步到其他节点。
这里我们就要关注的重点为:
DistroClientDataProcessor类继承了SmartSubscriber,遵循Subscriber/Notify(订阅发布)模式,当有订阅的事件触发时会进行回调通知。
DistroClientDataProcessor订阅了ClientChangedEvent(服务改变)、ClientDisconnectEvent(服务断开)和ClientVerifyFailedEvent(验证失败)事件。 在subscribeTypes方法中体现
@Override
public List<Class<? extends Event>> subscribeTypes() {
List<Class<? extends Event>> result = new LinkedList<>();
result.add(ClientEvent.ClientChangedEvent.class);
result.add(ClientEvent.ClientDisconnectEvent.class);
result.add(ClientEvent.ClientVerifyFailedEvent.class);
return result;
}
这里我们重点关注ClientChangedEvent事件,那么当事件触发时,会调用onEvent方法
@Override
public void onEvent(Event event) {
if (EnvUtil.getStandaloneMode()) {
return;
}
if (!upgradeJudgement.isUseGrpcFeatures()) {
return;
}
if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
} else {
syncToAllServer((ClientEvent) event);
}
}
查看syncToAllServer()方法
private void syncToAllServer(ClientEvent event) {
Client client = event.getClient();
if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
return;
}
if (event instanceof ClientEvent.ClientDisconnectEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.DELETE);
} else if (event instanceof ClientEvent.ClientChangedEvent) {
DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
distroProtocol.sync(distroKey, DataOperation.CHANGE);
}
}
查看同步方法sync()向除本节点外的所有节点进行数据同步,对每个节点执行具体的同步逻辑syncToTarget方法。
public void sync(DistroKey distroKey, DataOperation action) {
sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
public void sync(DistroKey distroKey, DataOperation action, long delay) {
for (Member each : memberManager.allMembersWithoutSelf()) {
syncToTarget(distroKey, action, each.getAddress(), delay);
}
}
继续跟踪:syncToTarget(方法)
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
targetServer);
DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
if (Loggers.DISTRO.isDebugEnabled()) {
Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
}
}
分析到这里就能清楚增量数据同步的信息了,那么其他服务在接受到数据增量更新以后还会调用upgradeClient(客户端升级方法)来进行数据同步这里和全量的过程就一致了。
openFeign
openFeign原理简述
feignClient: 同样集成了ribbon 跨服务去调用时,会去ribbon里面找。 (rpc协议 在spring中是基于http协议的 可以理解为远程调用 rpc流程:调用——》 序列化——》 通信 ——》 反序列化——》 被调用方),
且openfegin集成了Hystrix(豪猪) 可以用在调用进行降级处理
gateway
gateway原理简述
待补充
sentinel
sentinel原理简述
降级———— fallback = xxx.class 设置降级策略,例如订单服务、积分服务中,积分失败了 在降级中做补偿措施。 . 限流———— 限制访问 给出提示 . 熔断———— 例如配置当降级异常数量、比例达到某个程度时,直接进入熔断(表示直接进入降级方法 而无需进入业务方法)
. 三种常用限流算法:滑动窗口算法(常用) 令牌桶算法 漏桶算法 . 滑动窗口算法 :
senta
senta原理简述
通过undo日志来实现,例如插入一条数据,生成一条删除语句,如果有异常则执行删除语句。 seata首推AT模式: 主要起一个事务协调者的作用
. 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。 . 二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。 其它模式:TCC(主要是自定义内容),SAGA等
rocketmq
rocketmq原理简述
线程池、队列组成,队列在rocketmq中 其实是用copyonWriteList完成的
待补充
|