IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 【springcloud】springcloud alibaba生态中间件原理简述 -> 正文阅读

[Java知识库]【springcloud】springcloud alibaba生态中间件原理简述

注:文章为博主自用版

nacos

nacos原理简述

nacos优势(自理解):注册中心+配置中心(比如阿波罗这种)+自带负载均衡,功能比较齐全,且阿里系中间件 经历了双十一的考验,国产,文档易读且社区活跃。

注册中心原理: 配置注册到注册中心(registerInstance方法),注册中心得到了客户端地址和实例名,通过心跳检测机制,知道是否存活,如果一段时间内发现不存在心跳 则认定不存在 踢下线。

ribbon: nacos集成了ribbon , 原理是有一个拦截器,将实例名解析成网址,在本地缓存。

nacos一致性协议:
Distro协议是Nacos社区自研的一种AP分布式协议,是面向临时实例设计的一种分布式协议,其保证在某些Nacos节点宕机后,整个临时实例处理系统依旧可以正常工作。作为一种有状态的中间件应用内嵌协议,Distro保证了各个Nacos节点对于注册请求的统一协调和储存。

Distro协议的主要设计思想如下:

  • Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。
  • 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。
  • 每个节点独立处理读请求,及时从本地发出响应。

新加入的 Distro 节点会进行全量数据拉取。具体操作是轮询所有的 Distro 节点,通过向其他的机器发送请求拉取全量数据。
在这里插入图片描述在全量拉取操作完成之后,Nacos 的每台机器上都维护了当前的所有注册上来的非持久化实例数据。

在 Distro 集群启动之后,各台机器之间会定期的发送心跳。心跳信息主要为各个机器上的所有数据的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在?个较低水平)。这种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起?次数据校验请求。

?旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不?致,则会发起?次全量拉取请求,将数据补齐。

在这里插入图片描述

nacos原理源码分析

AP模式集群数据同步源码分析

全量数据同步

首先我们需要先找到DistroProtocol类型,它就是Distro协议的实现,然后在它的构造方法中,启动了一个startDistroTask()任务,其中包括了初始化同步任务 startLoadTask()

private void startDistroTask() {
    if (EnvUtil.getStandaloneMode()) {
        isInitialized = true;
        return;
    }
    startVerifyTask();
    // 初始化同步任务
    startLoadTask();
}

startLoadTask()数据加载任务创建了一个DistroLoadDataTask任务,并传入了一个修改当前节点Distro协议完成状态的回调函数。

private void startLoadTask() {
    DistroCallback loadCallback = new DistroCallback() {
        @Override
        public void onSuccess() {
            isInitialized = true;
        }

        @Override
        public void onFailed(Throwable throwable) {
            isInitialized = false;
        }
    };
    //传入了一个修改当前节点Distro协议完成状态的回调函数。
    GlobalExecutor.submitLoadDataTask(
        new DistroLoadDataTask(memberManager, distroComponentHolder, DistroConfig.getInstance(), loadCallback));
}

接下来我们需要查看load方法,这里判断了几种情况,其中loadAllDataSnapshotFromRemote(读取所有远程的数据快照)

private void load() throws Exception {
    // 若除了自身之外没有其他节点,则休眠一秒,可以其他节点未启动
    while (memberManager.allMembersWithoutSelf().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init...");
        TimeUnit.SECONDS.sleep(1);
    }
    // 若数据类型为空,说明distroComponentHolder的组件注册器还未初始化完毕
    while (distroComponentHolder.getDataStorageTypes().isEmpty()) {
        Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register...");
        TimeUnit.SECONDS.sleep(1);
    }
    // 加载每个类型的数据
    for (String each : distroComponentHolder.getDataStorageTypes()) {
        if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) {
            // 调用加载方法,并标记已处理
            loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each));
        }
    }
}

调用loadAllDataSnapshotFromRemote(each)方法获取同步数据,从其他节点获取同步数据,使用DistroTransportAgent获取数据,使用DistroDataProcessor来处理数据。

private boolean loadAllDataSnapshotFromRemote(String resourceType) {
    // 获取数据传输对象
    DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType);
    // 获取数据处理器
    DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);
    if (null == transportAgent || null == dataProcessor) {
        Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}",
                            resourceType, transportAgent, dataProcessor);
        return false;
    }
    // 向每个节点请求数据
    for (Member each : memberManager.allMembersWithoutSelf()) {
        try {
            Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress());
            // 获取数据
            DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress());
            // 解析数据
            boolean result = dataProcessor.processSnapshot(distroData);
            Loggers.DISTRO
                .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(),
                      result);
            // 若解析成功,标记此类型数据已经加载完毕
            if (result) {
                distroComponentHolder.findDataStorage(resourceType).finishInitial();
                return true;
            }
        } catch (Exception e) {
            Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e);
        }
    }
    return false;
}

这里我们要查看一下如何获取的数据,使用DistroTransportAgent获取数据getDatumSnapshot()方法,看完这个方法我们还要看一下如何处理数据。

@Override
public DistroData getDatumSnapshot(String targetServer) {
    // 从节点管理器获取目标节点信息
    Member member = memberManager.find(targetServer);
    // 判断目标服务器是否健康
    if (checkTargetServerStatusUnhealthy(member)) {
        throw new DistroException(
            String.format("[DISTRO] Cancel get snapshot caused by target server %s unhealthy", targetServer));
    }
    // 构建请求参数
    DistroDataRequest request = new DistroDataRequest();
    // 设置请求的操作类型为DataOperation.SNAPSHOT(数据快照)
    request.setDataOperation(DataOperation.SNAPSHOT);
    try {
        Response response = clusterRpcClientProxy.sendRequest(member, request);
        if (checkResponse(response)) {
            return ((DistroDataResponse) response).getDistroData();
        } else {
            throw new DistroException(
                String.format("[DISTRO-FAILED] Get snapshot request to %s failed, code: %d, message: %s",
                              targetServer, response.getErrorCode(), response.getMessage()));
        }
    } catch (NacosException e) {
        throw new DistroException("[DISTRO-FAILED] Get distro snapshot failed! ", e);
    }
}

使用DistroDataProcessor来处理数据processSnapshot(distroData)方法

@Override
public boolean processSnapshot(DistroData distroData) {
    // 反序列化获取distroData为ClientSyncDatumSnapshot
    ClientSyncDatumSnapshot snapshot = ApplicationUtils.getBean(Serializer.class)
        .deserialize(distroData.getContent(), ClientSyncDatumSnapshot.class);
    // 处理结果集,这里将返回远程节点负责的所有Client以及所有的service、instance信息
    for (ClientSyncData each : snapshot.getClientSyncDataList()) {
        // 处理每一个client
        handlerClientSyncData(each);
    }
    return true;
}

具体处理数据方法handlerClientSyncData()

private void handlerClientSyncData(ClientSyncData clientSyncData) {
    Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());
    // 因为是同步数据,所以这里缓存数据
    clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());
    Client client = clientManager.getClient(clientSyncData.getClientId());
    // 升级客户端服务信息
    upgradeClient(client, clientSyncData);
}

这里的核心点就是upgradeClient方法,此方法的目的就是同步各个节点的数据

private void upgradeClient(Client client, ClientSyncData clientSyncData) {
    List<String> namespaces = clientSyncData.getNamespaces();
    List<String> groupNames = clientSyncData.getGroupNames();
    List<String> serviceNames = clientSyncData.getServiceNames();
    List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();
    // 已同步的服务集合
    Set<Service> syncedService = new HashSet<>();
    for (int i = 0; i < namespaces.size(); i++) {
        //从获取的数据中构建一个Service对象
        Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));
        //单例模式
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        // 标记此service已经被处理
        syncedService.add(singleton);
        // 获取当前实例
        InstancePublishInfo instancePublishInfo = instances.get(i);
        // 判断是否包含当前获取的实例
        if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {
            // 不包含则添加
            client.addServiceInstance(singleton, instancePublishInfo);
            // 当前节点发布服务注册事件
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));
        }
    }
    // 若当前client内部已发布的service不在本次同步的列表内,说明已经过时了,要删掉
    for (Service each : client.getAllPublishedService()) {
        if (!syncedService.contains(each)) {
            client.removeServiceInstance(each);
            NotifyCenter.publishEvent(
                new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));
        }
    }
}

到这里就完成了数据初始化同步,Nacos每台机器上都维护了当前所有注册上来的非持久化实例数据。

但是各位要注意的是,这只是一个新的Nacos节点上线的同步数据操作,那么如果某个注册的客户端节点改变那,那么所有的Nacos节点都是需要数据同步的

增量数据同步

数据完成初始化后,节点的数据发生变化后需要将增量数据同步到其他节点。

这里我们就要关注的重点为:

DistroClientDataProcessor类继承了SmartSubscriber,遵循Subscriber/Notify(订阅发布)模式,当有订阅的事件触发时会进行回调通知。

DistroClientDataProcessor订阅了ClientChangedEvent(服务改变)、ClientDisconnectEvent(服务断开)和ClientVerifyFailedEvent(验证失败)事件。
在subscribeTypes方法中体现

@Override
public List<Class<? extends Event>> subscribeTypes() {
    List<Class<? extends Event>> result = new LinkedList<>();
    result.add(ClientEvent.ClientChangedEvent.class);
    result.add(ClientEvent.ClientDisconnectEvent.class);
    result.add(ClientEvent.ClientVerifyFailedEvent.class);
    return result;
}

这里我们重点关注ClientChangedEvent事件,那么当事件触发时,会调用onEvent方法

@Override
public void onEvent(Event event) {
    if (EnvUtil.getStandaloneMode()) {
        return;
    }
    if (!upgradeJudgement.isUseGrpcFeatures()) {
        return;
    }
    if (event instanceof ClientEvent.ClientVerifyFailedEvent) {
        syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);
    } else {
        // 增量同步调用方法
        syncToAllServer((ClientEvent) event);
    }
}

查看syncToAllServer()方法

private void syncToAllServer(ClientEvent event) {
    Client client = event.getClient();
    // Only ephemeral data sync by Distro, persist client should sync by raft.
    if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {
        return;
    }
    if (event instanceof ClientEvent.ClientDisconnectEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.DELETE);
        // 节点变更事件,即增量数据同步方法
    } else if (event instanceof ClientEvent.ClientChangedEvent) {
        DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);
        distroProtocol.sync(distroKey, DataOperation.CHANGE);
    }
}

查看同步方法sync()向除本节点外的所有节点进行数据同步,对每个节点执行具体的同步逻辑syncToTarget方法。

//通过配置延迟开始同步
//@param distroKey 同步数据的分发密钥
//@param action    数据操作的动作
public void sync(DistroKey distroKey, DataOperation action) {
    sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());
}
//开始将数据同步到所有远程服务器。
//@param distroKey 同步数据的分发密钥
//@param action    数据操作的动作
//@param delay     同步延迟时间
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        syncToTarget(distroKey, action, each.getAddress(), delay);
    }
}

继续跟踪:syncToTarget(方法)

/**
* 开始同步到目标服务器。
*
* @param distroKey    同步数据的分发密钥
* @param action       数据操作的动作
* @param targetServer 目标服务器
* @param delay        同步延迟时间
*/
public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {
    DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                                                  targetServer);
    DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
    distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    if (Loggers.DISTRO.isDebugEnabled()) {
        Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);
    }
}

分析到这里就能清楚增量数据同步的信息了,那么其他服务在接受到数据增量更新以后还会调用upgradeClient(客户端升级方法)来进行数据同步这里和全量的过程就一致了。

openFeign

openFeign原理简述

feignClient: 同样集成了ribbon 跨服务去调用时,会去ribbon里面找。
(rpc协议 在spring中是基于http协议的 可以理解为远程调用 rpc流程:调用——》 序列化——》 通信 ——》 反序列化——》 被调用方),

且openfegin集成了Hystrix(豪猪) 可以用在调用进行降级处理

gateway

gateway原理简述

待补充

sentinel

sentinel原理简述

降级———— fallback = xxx.class 设置降级策略,例如订单服务、积分服务中,积分失败了 在降级中做补偿措施。
. 限流———— 限制访问 给出提示
. 熔断———— 例如配置当降级异常数量、比例达到某个程度时,直接进入熔断(表示直接进入降级方法 而无需进入业务方法)

. 三种常用限流算法:滑动窗口算法(常用) 令牌桶算法 漏桶算法
. 滑动窗口算法 :

senta

senta原理简述

通过undo日志来实现,例如插入一条数据,生成一条删除语句,如果有异常则执行删除语句。
seata首推AT模式: 主要起一个事务协调者的作用

. 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
. 二阶段:提交异步化,非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。
其它模式:TCC(主要是自定义内容),SAGA等

rocketmq

rocketmq原理简述

线程池、队列组成,队列在rocketmq中 其实是用copyonWriteList完成的

待补充

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-07-21 21:22:54  更:2022-07-21 21:26:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:32:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码