IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Eureka Server端源码启动流程及REST接口解析 -> 正文阅读

[大数据]Eureka Server端源码启动流程及REST接口解析

目录

EurekaBootStrap

initEurekaEnvironment

initEurekaServerContext

ApplicationInfoManager

创建EurekaClient

创建应用实例信息的注册表

初始化EurekaServerContext


EurekaBootStrap

Eureka-Server?启动入口:该类实现了ServletContextListener,在 Servlet 容器( 例如 Tomcat、Jetty )启动时,调用?#contextInitialized()?方法。

    @Override
    public void contextInitialized(ServletContextEvent event) {
        try {
            // 初始化eureka-server配置环境
            initEurekaEnvironment();
            // 初始化eureka-server上下文
            initEurekaServerContext();

            ServletContext sc = event.getServletContext();
            sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
        } catch (Throwable e) {
            logger.error("Cannot bootstrap eureka server :", e);
            throw new RuntimeException("Cannot bootstrap eureka server :", e);
        }
    }

下面依次看下这两步具体操作有哪些

initEurekaEnvironment

protected void initEurekaEnvironment() throws Exception {
        logger.info("Setting the eureka configuration..");

        // 获取数据中心
        String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
        if (dataCenter == null) {
            logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
            ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
        } else {
            ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
        }
        // 获取环境信息
        String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT);
        if (environment == null) {
            ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
            logger.info("Eureka environment value eureka.environment is not set, defaulting to test");
        }
    }

主要是初始化环境信息,没有很多内容,主要看下初始化上下文的步骤

initEurekaServerContext

protected void initEurekaServerContext() throws Exception {
        EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();

        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

        // 根据server配置,创建服务解码器
        logger.info("Initializing the eureka client...");
        logger.info(eurekaServerConfig.getJsonCodecName());
        ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);

        ApplicationInfoManager applicationInfoManager = null;
        // Eureka-Server 内嵌 Eureka-Client,用于和 Eureka-Server 集群里其他节点通信交互
        if (eurekaClient == null) {
            EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
                    ? new CloudInstanceConfig()
                    : new MyDataCenterInstanceConfig();
            
            applicationInfoManager = new ApplicationInfoManager(
                    instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
            
            EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
            /**
             * eureka server 的eurekaClient本身也是个DiscoveryClient
             */
            eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
        } else {
            applicationInfoManager = eurekaClient.getApplicationInfoManager();
        }
        // 创建应用实例信息的注册表
        PeerAwareInstanceRegistry registry;
        if (isAws(applicationInfoManager.getInfo())) {
            registry = new AwsInstanceRegistry(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
            awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
            awsBinder.start();
        } else {
            registry = new PeerAwareInstanceRegistryImpl(
                    eurekaServerConfig,
                    eurekaClient.getEurekaClientConfig(),
                    serverCodecs,
                    eurekaClient
            );
        }
        // 创建 Eureka-Server 集群节点集合
        PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
                registry,
                eurekaServerConfig,
                eurekaClient.getEurekaClientConfig(),
                serverCodecs,
                applicationInfoManager
        );
        // 创建Eureka-Server上下文(提供初始化、关闭、获取等方法)
        serverContext = new DefaultEurekaServerContext(
                eurekaServerConfig,
                serverCodecs,
                registry,
                peerEurekaNodes,
                applicationInfoManager
        );

        // 初始化 EurekaServerContextHolder,使用它方便获取server的上下文
        EurekaServerContextHolder.initialize(serverContext);
        // 初始化时,其他server注册的client实例信息,通过创建remoteRegionRegistry时,创建线程发起http请求获得(应该也和其他的机制有关)
        serverContext.initialize();
        logger.info("Initialized server context");

        // Copy registry from neighboring eureka node 从其他 Eureka-Server 拉取注册信息
        int registryCount = registry.syncUp();
        registry.openForTraffic(applicationInfoManager, registryCount);

        // Register all monitoring statistics. 注册监控
        EurekaMonitors.registerAllStats();
    }

整个初始化EurekaServer的流程和细节点较多,一点点分析:

ApplicationInfoManager

注释1??:

首先是根据EurekaInstanceConfig、InstanceInfo来创建应用管理类,这两个类作为属性构成ApplicationInfoManager,因此主要看下InstanceInfo的创建代码,通过EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()来得到实例信息:

首先是根据配置文件的心跳间隔时间(默认30秒)、续约过期时间(默认90秒)来创建租约类的builder,租约类是服务发起续约,server过期应用的依据。后续是通过配置文件的设置数据来赋值实例属性,实例ID若有配置则读取配置,否则以主机名作为实例ID。

@Override
    public synchronized InstanceInfo get() {
        if (instanceInfo == null) {
            // Build the lease information to be passed to the server based on config 根据eureka的Client配置构建要传递给服务器的租约信息
            LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
                    .setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
                    .setDurationInSecs(config.getLeaseExpirationDurationInSeconds());

            if (vipAddressResolver == null) {
                vipAddressResolver = new Archaius1VipAddressResolver();
            }

            // Builder the instance information to be registered with eureka server
            InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);

            // set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname 设置instanceId,有若配置读取配置,否则以当前hostname作为实例ID
            String instanceId = config.getInstanceId();
            if (instanceId == null || instanceId.isEmpty()) {
                DataCenterInfo dataCenterInfo = config.getDataCenterInfo();
                if (dataCenterInfo instanceof UniqueIdentifier) {
                    instanceId = ((UniqueIdentifier) dataCenterInfo).getId();
                } else {
                    instanceId = config.getHostName(false);
                }
            }
            // 设置客户端默认地址,主机名或ID地址
            String defaultAddress;
            if (config instanceof RefreshableInstanceConfig) {
                // Refresh AWS data center info, and return up to date address
                defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
            } else {
                defaultAddress = config.getHostName(false);
            }

            // fail safe
            if (defaultAddress == null || defaultAddress.isEmpty()) {
                defaultAddress = config.getIpAddress();
            }

            builder.setNamespace(config.getNamespace())
                    .setInstanceId(instanceId)
                    .setAppName(config.getAppname())
                    .setAppGroupName(config.getAppGroupName())
                    .setDataCenterInfo(config.getDataCenterInfo())
                    .setIPAddr(config.getIpAddress())
                    .setHostName(defaultAddress)
                    .setPort(config.getNonSecurePort())
                    .enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
                    .setSecurePort(config.getSecurePort())
                    .enablePort(PortType.SECURE, config.getSecurePortEnabled())
                    .setVIPAddress(config.getVirtualHostName())
                    .setSecureVIPAddress(config.getSecureVirtualHostName())
                    .setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
                    .setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
                    .setASGName(config.getASGName())
                    .setHealthCheckUrls(config.getHealthCheckUrlPath(),
                            config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());


            // Start off with the STARTING state to avoid traffic
            if (!config.isInstanceEnabledOnit()) {
                InstanceStatus initialStatus = InstanceStatus.STARTING;
                LOG.info("Setting initial instance status as: {}", initialStatus);
                builder.setStatus(initialStatus);
            } else {
                LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise "
                         + "itself as available. You would instead want to control this via a healthcheck handler.",
                         InstanceStatus.UP);
            }

            // Add any user-specific metadata information 添加一些特定于客户端的元数据信息
            for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
                String key = mapEntry.getKey();
                String value = mapEntry.getValue();
                // only add the metadata if the value is present
                if (value != null && !value.isEmpty()) {
                    builder.add(key, value);
                }
            }

            instanceInfo = builder.build();
            instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
        }
        return instanceInfo;
    }

创建EurekaClient

紧接着通过eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);应用管理类和eurekaClient的配置来创建eurekaClient,可以看到和EurekaClient一样,也是通过创建DiscoveryClient类的方式,来将当前server实例注册、续约注册中心的,从这里也可以看出来,Server实例没有主次之分,每个Server也是将自己作为一个client来注册到注册中心上的。

创建应用实例信息的注册表

后续通过PeerAwareInstanceRegistryImpl进行注册表的创建

public PeerAwareInstanceRegistryImpl(
            EurekaServerConfig serverConfig,
            EurekaClientConfig clientConfig,
            ServerCodecs serverCodecs,
            EurekaClient eurekaClient
    ) {
        super(serverConfig, clientConfig, serverCodecs);
        this.eurekaClient = eurekaClient;
        this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
        // We first check if the instance is STARTING or DOWN, then we check explicit overrides,
        // then we check the status of a potentially existing lease.
        this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
                new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
    }

通过调用父类的实现来初始化属性,然后创建实例状态覆盖规则,初始化时,赋予了三个覆盖规则,当都不满足时,返回默认覆盖规则执行结果。

覆盖状态的作用:

调用 Eureka-Server HTTP Restful 接口?apps/${APP_NAME}/${INSTANCE_ID}/status?对应用实例覆盖状态的变更,从而达到主动的、强制的变更应用实例状态。注意,实际不会真的修改 Eureka-Client 应用实例的状态,而是修改在 Eureka-Server 注册的应用实例的状态

通过这样的方式,Eureka-Client 在获取到注册信息时,并且配置?eureka.shouldFilterOnlyUpInstances = true,过滤掉非?InstanceStatus.UP?的应用实例,从而避免调动该实例,以达到应用实例的暂停服务(?InstanceStatus.OUT_OF_SERVICE?),而无需关闭应用实例

因此,大多数情况下,调用该接口的目的,将应用实例状态在 (?InstanceStatus.UP?) 和 (?InstanceStatus.OUT_OF_SERVICE?) 之间切换。

一、覆盖状态规则

?下线或启动规则(DownOrStartingRule):当当前实例状态不处于UP、OUT_OF_SERVICE时,进行当前规则的执行,直接返回当前实例的状态作为覆盖状态,

public class DownOrStartingRule implements InstanceStatusOverrideRule {

    @Override
    public StatusOverrideResult apply(InstanceInfo instanceInfo,
                                      Lease<InstanceInfo> existingLease,
                                      boolean isReplication) {
        /**
         * 若实例状态不处于运行中、暂停服务,(启动中、下线),则不适合提供服务,不匹配
         */
        if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
                && (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
            logger.debug("Trusting the instance status {} from replica or instance for instance {}",
                    instanceInfo.getStatus(), instanceInfo.getId());
            return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
        }
        return StatusOverrideResult.NO_MATCH;
    }

二、覆盖状态存在规则(OverrideExistsRule):若当前实例状态状态覆盖的数据,则使用已存在的覆盖状态作为当前覆盖状态

三、续约存在规则(LeaseExistsRule):非Server请求时,匹配已存在租约的应用实例的?nstanceStatus.OUT_OF_SERVICE?或者?InstanceInfo.InstanceStatus.UP?状态?

四、默认规则(AlwaysMatchInstanceStatusRule):总是返回当前实例的状态,来作为覆盖状态。

super(serverConfig, clientConfig, serverCodecs)

接着看下super里做了些什么,首先是赋值属性,创建最近取消、注册队列,定时任务设置getDeltaRetentionTask()的线程以配置的时间间隔(默认30秒)来定时执行。而getDeltaRetentionTask的run()方法,是遍历最近改变的队列信息,若队列内实例更新时间超过当前时间一定的时间段(默认三分钟),则从最近改变队列中移除,当client端发起对注册信息增量获取时,recentlyChangedQueue被用来计算最近时间的增量,返回给client端。

protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
        this.serverConfig = serverConfig;
        this.clientConfig = clientConfig;
        this.serverCodecs = serverCodecs;
        this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
        this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);

        this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);

        /**
         * 30秒执行一次清理工作
         */
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    }

private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {

            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    // 将更新时间超过当前3分钟的数据移出队列
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }

        };
    }

后续是创建PeerEurekaNodes(Eureka-Server 集群节点集合)、创建EurekaServerContext(Eureka-Server上下文),将上下文放入holder方便获取,紧接着到了初始化上下文的节点,重点看下这个

初始化EurekaServerContext

初始化的代码,主要是做了两件事,启动刚刚创建好的server集群节点集合、初始化应用实例信息注册表,一个个来看。

public void initialize() {
        logger.info("Initializing ...");
        // 启动 Eureka-Server 集群节点集合(集群复制)
        peerEurekaNodes.start();
        try {
            // 初始化 应用实例信息的注册表
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
    }

启动Server集群节点集合

start():首先是创建一个定时任务,紧接着更新集群节点信息,创建一个实现run方法的类,run()的代码主要起到更新集群节点信息的作用,接着将该类交由定时任务每隔一定时间执行一次。这样就保证了初始化时,获取到server的集群信息,并每间隔一段时间发起请求去更新本地注册表server实例的用途。

public void start() {
        // 创建定时任务
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            // 初始化集群节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 更新集群节点信息
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            /**
             * 定时任务设置执行间隔(默认10分钟)
             */
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }

下面主要看下,server是如何初始化、更新集群节点信息的

resolvePeerUrls():得到当前实例信息、可见区,然后通过getDiscoveryServiceUrls()根据DNS或者配置信息解析出所有的server服务URLS,除去当前server实例信息,就是其他所有的server服务。

protected List<String> resolvePeerUrls() {
        /**
         * 获取当前实例信息、可见区
         */
        InstanceInfo myInfo = applicationInfoManager.getInfo();
        String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
        // 获取 eureka 客户端与之对话的所有 eureka 服务 url 的列表
        List<String> replicaUrls = EndpointUtils
                .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));

        /**
         * 去除本身URL,剩余是需要同步的
         */
        int idx = 0;
        while (idx < replicaUrls.size()) {
            if (isThisMyUrl(replicaUrls.get(idx))) {
                replicaUrls.remove(idx);
            } else {
                idx++;
            }
        }
        return replicaUrls;
    }

public static List<String> getDiscoveryServiceUrls(EurekaClientConfig clientConfig, String zone, ServiceUrlRandomizer randomizer) {
        /**
         * 根据DNS或者配置信息解析出所有server服务urls
         */
        boolean shouldUseDns = clientConfig.shouldUseDnsForFetchingServiceUrls();
        if (shouldUseDns) {
            return getServiceUrlsFromDNS(clientConfig, zone, clientConfig.shouldPreferSameZoneEureka(), randomizer);
        }
        return getServiceUrlsFromConfig(clientConfig, zone, clientConfig.shouldPreferSameZoneEureka());
    }

更新集群节点集合

上面已经得到最新的server节点集合,之前的server节点除去所有新server,剩余的就是要下线的server,最新server除去所有之前server节点集合,就是本次新增的server节点集合。

若本次没有新增、下线server节点,不进行操作。若下线节点集合存在,shutdown下线的eurekaNode,关闭node的线程池(集群同步,下文讲解);若新增节点集合存在,创建node节点,并添加到临时变量newNodeList中,后续赋值为当前server集群节点集合。首次初始化时,就是通过解析出的server全为新增节点,来完成peerEurekaNodes集合的属性填充。

protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        if (newPeerUrls.isEmpty()) {
            logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
            return;
        }

        /**
         * 计算出需要下线、新增的服务列表
         */
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);

        /**
         * 无需要下线、新增的,直接返回
         */
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }

        // Remove peers no long available
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);

        /**
         * 下线删除
         */
        if (!toShutdown.isEmpty()) {
            logger.info("Removing no longer available peer nodes {}", toShutdown);
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }

        /**
         * 新增添加到peerEurekaNodes
         */
        // Add new peers
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }

        this.peerEurekaNodes = newNodeList;
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }

初始化 应用实例信息的注册表

根据刚刚创建好的集群节点集合去初始化注册信息:每一步的作用下面已有注释,下面分析下主要的代码

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        // 开始测速类定时任务,每分钟清空,实现测速
        this.numberOfReplicationsLastMin.start();
        // 属性赋值
        this.peerEurekaNodes = peerEurekaNodes;
        // 初始化响应缓存
        initializedResponseCache();
        // 通过定时任务,间隔一定时间,更新续约阈值
        scheduleRenewalThresholdUpdateTask();
        // 初始化远程的server注册信息
        initRemoteRegionRegistry();

        try {
            // 监控注册
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
        }
    }

初始化响应缓存

1、通过CacheBuilder创建默认1000大小,180秒过期的缓存数据

2、若当前实例允许使用readonly的缓存,那么开启定时任务,每隔30秒更新readOnlyCacheMap的数据,实现是遍历readOnlyCacheMap若value值和readWriteCacheMap的value值不同,则将readWriteCacheMap中最新的value值赋值到readOnly缓存中去,这样就保证了readOnly的数据不会存在太久的脏数据。

ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;

        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        // guava缓存 默认容量1000,180秒过期
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener<Key, Value>() {
                            @Override
                            public void onRemoval(RemovalNotification<Key, Value> notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });

        if (shouldUseReadOnlyResponseCache) {
            // 初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒
            // 负责更新readwriterMap中的数据
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }

        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
        }
    }

readOnly数据的填充,是当其他服务发起全量、增量、应用信息获取时,通过响应缓存读取数据,若readOnlyMap存在缓存则,直接返回,否则读取readWriterCacheMap,并将返回值赋值到readOnlyMap中,避免对readWriterCacheMap的频繁调用。

Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                // 先读取 readOnlyCacheMap 。读取不到,读取 readWriteCacheMap ,并设置到 readOnlyCacheMap
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    // 过期后,重新设置(默认180秒过期一次)
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                // 读取 readWriteCacheMap
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key : {}", key, t);
        }
        return payload;
    }

那么为什么要这样操作,避免对readWriterCacheMap频繁调用呢,readWriterCacheMap已经是记录三分钟、1000容量的缓存,当未命中到缓存时,需要通过generatePayload()方法,来获取到具体数据,具体代码见下面代码:

 .build(new CacheLoader<Key, Value>() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                Value value = generatePayload(key);
                                return value;
                            }
                        });

// 服务有效负载
private Value generatePayload(Key key) {
        Stopwatch tracer = null;
        try {
            String payload;
            switch (key.getEntityType()) {
                case Application:
                    boolean isRemoteRegionRequested = key.hasRegions();

                    if (ALL_APPS.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeAllAppsWithRemoteRegionTimer.start();
                            payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
                        } else {
                            // 全量获取
                            tracer = serializeAllAppsTimer.start();
                            // 根据注册的实例集合和key构造需要缓存的实例数据
                            payload = getPayLoad(key, registry.getApplications());
                        }
                    } else if (ALL_APPS_DELTA.equals(key.getName())) {
                        // 增量(待完成)
                        if (isRemoteRegionRequested) {
                            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                            versionDeltaWithRegions.incrementAndGet();
                            versionDeltaWithRegionsLegacy.incrementAndGet();
                            payload = getPayLoad(key,
                                    registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeDeltaAppsTimer.start();
                            versionDelta.incrementAndGet();
                            versionDeltaLegacy.incrementAndGet();
                            payload = getPayLoad(key, registry.getApplicationDeltas());
                        }
                    } else {
                        tracer = serializeOneApptimer.start();
                        payload = getPayLoad(key, registry.getApplication(key.getName()));
                    }
                    break;
                case VIP:
                case SVIP:
                    tracer = serializeViptimer.start();
                    payload = getPayLoad(key, getApplicationsForVip(key, registry));
                    break;
                default:
                    logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
                    payload = "";
                    break;
            }
            return new Value(payload);
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    }

根据不同的请求类型,来完成相应操作,类型较多,只以全量为例进行解析,其余实现类似:getPayLoad():通过server解码器,将数据解码为String,返回给client,忽略。getApplicationsFromMultipleRegions():增加监控数据,可忽略,后续读取当前server的注册表信息,将租约实例信息添加到临时变量apps中,此时当前server的所有实例信息已经得到,但是当前server可能因为网络原因,本地的注册表并不一定会是最全最新的注册信息,因此将本地缓存的其他server的注册表信息(server集群同步时缓存,下文讲解)也添加到apps中,循环远程注册表缓存数据,apps获取该app名称,若不存在,创建app信息,并添加所有实例,因是map、set数据结构来接收实例信息,所以不会存在多个实例重复注册后,数据重复的情况。得到所有apps后,所有实例状态及状态对应数据放入map中,通过hashmap的自然排序,生成应用集合的hashcode,hashcode例子:DOWN_2_UP_8_。

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

        boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

        logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                includeRemoteRegion, remoteRegions);
        // 增加对应指令的监控数据
        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
        } else {
            GET_ALL_CACHE_MISS.increment();
        }
        // 获取当前server的所有应用实例集合
        Applications apps = new Applications();
        apps.setVersion(1L);
        for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
            Application app = null;

            if (entry.getValue() != null) {
                for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
                    Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
                    if (app == null) {
                        app = new Application(lease.getHolder().getAppName());
                    }
                    app.addInstance(decorateInstanceInfo(lease));
                }
            }
            if (app != null) {
                apps.addApplication(app);
            }
        }
        // 将其他活跃的server注册数据,也依次添加进集合中
        if (includeRemoteRegion) {
            for (String remoteRegion : remoteRegions) {
                // 猜测:应该是集群同步时,有其他集群的实例信息缓存?
                RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                if (null != remoteRegistry) {
                    Applications remoteApps = remoteRegistry.getApplications();
                    for (Application application : remoteApps.getRegisteredApplications()) {
                        if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                            logger.info("Application {}  fetched from the remote region {}",
                                    application.getName(), remoteRegion);

                            Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
                            if (appInstanceTillNow == null) {
                                appInstanceTillNow = new Application(application.getName());
                                apps.addApplication(appInstanceTillNow);
                            }
                            for (InstanceInfo instanceInfo : application.getInstances()) {
                                appInstanceTillNow.addInstance(instanceInfo);
                            }
                        } else {
                            logger.debug("Application {} not fetched from the remote region {} as there exists a "
                                            + "whitelist and this app is not in the whitelist.",
                                    application.getName(), remoteRegion);
                        }
                    }
                } else {
                    logger.warn("No remote registry available for the remote region {}", remoteRegion);
                }
            }
        }
        // 设置 应用集合hashcode,可以用来后续匹配校验是否变更过(该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ))
        apps.setAppsHashCode(apps.getReconcileHashCode());
        return apps;
    }

更新续约阈值:

跳出来,接着说初始化注册表,接下来会通过定时任务,更新续约阈值。

通过定时任务,每隔十五分钟执行一次,进行重置当前server的自我保护阈值。首先是获取当前server的所有实例数值,当数据大于 预期客户端数*0.85,或者未开启自我自我保护机制时,更新预期客户端数值,变更每分钟续约最小阈值。当开启保护机制,若运行活跃实例数据小于预期客户端*0.85时,不进行操作,此处也是server自我保护机制的实现,server任务该时间段内过多服务下线,server自动进行自我保护机制。不修改预期客户端数值、每分钟续约阈值,那么当驱逐实例定时任务运行时,存在其他服务续约过期,server判断实例数过小时,保护当前注册表信息,不会进行驱逐操作(下面分析)。

 private void scheduleRenewalThresholdUpdateTask() {
        timer.schedule(new TimerTask() {
                           @Override
                           public void run() {
                               updateRenewalThreshold();
                           }
                       }, serverConfig.getRenewalThresholdUpdateIntervalMs(),
                serverConfig.getRenewalThresholdUpdateIntervalMs());
    }

private void updateRenewalThreshold() {
        try {
            // 计算 应用实例数
            Applications apps = eurekaClient.getApplications();
            int count = 0;
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            // 若count>上次实例数*0.85,(server没进入自我保护机制)或者未开启自动保护时,更新实例数
            // 进入自我保护机制后,会保护目前注册表的实例
            synchronized (lock) {
                // Update threshold only if the threshold is greater than the
                // current expected threshold or if self preservation is disabled.
                if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
                        // 未开启 自我保护机制配置
                        || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfClientsSendingRenews = count;
                    updateRenewsPerMinThreshold();
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
    }

未完明日更新...

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-12-28 22:59:51  更:2021-12-28 23:01:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 12:59:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码