IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Eureka Client 源码解析 -> 正文阅读

[大数据]Eureka Client 源码解析

一、Client 端基本信息

在老版本的 Spring Cloud 的使用中,当构建客户端的时候,会在启动类上标注 @EnableDiscoveryClient 注解,但是在新版本的时候,便不需再标注了,这是因为 Spring Boot 中的自动配置机制不同的原因,这里暂不做论述。那客户端是如何做自动装配的呢?Spring Boot 自身的注解 @SpringBootApplication 本身就是一个组合注解,其中之一的 @EnableAutoConfiguration。

注:本系列文章 spring boot 版本基于:2.5.6,spring cloud 基于 2020.0.4。

Spring Boot 的 SPI 在每次项目启动时会去加载 /META-INF/spring.factories 文件,内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration

org.springframework.boot.Bootstrapper=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapper

从上面配置文件是key、value的形式的,在其内容中有这样的两个value:EurekaDiscoveryClientConfiguration(Eureka 服务发现自动装配类)、EurekaClientAutoConfiguration(Eureka 客户端自动配置类),那么可以看看 EurekaDiscoveryClientConfiguration 这个类的信息:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
public class EurekaDiscoveryClientConfiguration {

   @Bean
   @ConditionalOnMissingBean
   public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
      return new EurekaDiscoveryClient(client, clientConfig);
   }

   @Configuration(proxyBeanMethods = false)
   @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false)
   protected static class EurekaHealthCheckHandlerConfiguration {

      @Autowired(required = false)
      private StatusAggregator statusAggregator = new SimpleStatusAggregator();

      @Bean
      @ConditionalOnMissingBean(HealthCheckHandler.class)
      public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
         return new EurekaHealthCheckHandler(this.statusAggregator);
      }

   }

   @Configuration(proxyBeanMethods = false)
   @ConditionalOnClass(RefreshScopeRefreshedEvent.class)
   protected static class EurekaClientConfigurationRefresher
         implements ApplicationListener<RefreshScopeRefreshedEvent> {

      @Autowired(required = false)
      private EurekaClient eurekaClient;

      @Autowired(required = false)
      private EurekaAutoServiceRegistration autoRegistration;

      public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
         // This will force the creation of the EurkaClient bean if not already created
         // to make sure the client will be reregistered after a refresh event
         if (eurekaClient != null) {
            eurekaClient.getApplications();
         }
         if (autoRegistration != null) {
            // register in case meta data changed
            this.autoRegistration.stop();
            this.autoRegistration.start();
         }
      }

   }

}

上面代码的信息很显而易见,他包含EurekaDiscoveryClient、健康检查(EurekaHealthCheckHandlerConfiguration)、客户端配置刷新(EurekaClientConfigurationRefresher)。这里我们暂且只关注 EurekaDiscoveryClient。如下:

public class EurekaDiscoveryClient implements DiscoveryClient {

   public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client";

   private final EurekaClient eurekaClient;

   private final EurekaClientConfig clientConfig;

   public EurekaDiscoveryClient(EurekaClient eurekaClient, EurekaClientConfig clientConfig) {
      this.clientConfig = clientConfig;
      this.eurekaClient = eurekaClient;
   }
   // ...........
}

在 EurekaDiscoveryClient 的构造函数中有:EurekaClient、EurekaClientConfig,且 EurekaDiscoveryClient 继承了DiscoveryClient接口,这个接口便是服务发现的一个重要的接口。

这里还需要看一下EurekaClientAutoConfiguration这个类,因为这个类是负责Eureka Client 关键 Bean 的配置和初始化的,如ApplicationInfoManager、EurekaClientConfig等。如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = { "org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration",
      "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
      "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
      "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {

   private ConfigurableEnvironment env;

   public EurekaClientAutoConfiguration(ConfigurableEnvironment env) {
      this.env = env;
   }

   @Bean
   public HasFeatures eurekaFeature() {
      return HasFeatures.namedFeature("Eureka Client", EurekaClient.class);
   }

   @Bean
   @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
   public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
      return new EurekaClientConfigBean();
   }

   @Bean
   @ConditionalOnMissingBean
   public ManagementMetadataProvider serviceManagementMetadataProvider() {
      return new DefaultManagementMetadataProvider();
   }

   private String getProperty(String property) {
      return this.env.containsProperty(property) ? this.env.getProperty(property) : "";
   }

   @Bean
   @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
   public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils,
         ManagementMetadataProvider managementMetadataProvider) {
      String hostname = getProperty("eureka.instance.hostname");
      boolean preferIpAddress = Boolean.parseBoolean(getProperty("eureka.instance.prefer-ip-address"));
      String ipAddress = getProperty("eureka.instance.ip-address");
      boolean isSecurePortEnabled = Boolean.parseBoolean(getProperty("eureka.instance.secure-port-enabled"));

      String serverContextPath = env.getProperty("server.servlet.context-path", "/");
      int serverPort = Integer.parseInt(env.getProperty("server.port", env.getProperty("port", "8080")));

      Integer managementPort = env.getProperty("management.server.port", Integer.class);

      String managementContextPath = env.getProperty("management.server.servlet.context-path");
      if (!StringUtils.hasText(managementContextPath)) {
         managementContextPath = env.getProperty("management.server.base-path");
      }

      Integer jmxPort = env.getProperty("com.sun.management.jmxremote.port", Integer.class);
      EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);

      instance.setNonSecurePort(serverPort);
      instance.setInstanceId(getDefaultInstanceId(env));
      instance.setPreferIpAddress(preferIpAddress);
      instance.setSecurePortEnabled(isSecurePortEnabled);
      if (StringUtils.hasText(ipAddress)) {
         instance.setIpAddress(ipAddress);
      }

      if (isSecurePortEnabled) {
         instance.setSecurePort(serverPort);
      }

      if (StringUtils.hasText(hostname)) {
         instance.setHostname(hostname);
      }
      String statusPageUrlPath = getProperty("eureka.instance.status-page-url-path");
      String healthCheckUrlPath = getProperty("eureka.instance.health-check-url-path");

      if (StringUtils.hasText(statusPageUrlPath)) {
         instance.setStatusPageUrlPath(statusPageUrlPath);
      }
      if (StringUtils.hasText(healthCheckUrlPath)) {
         instance.setHealthCheckUrlPath(healthCheckUrlPath);
      }

      ManagementMetadata metadata = managementMetadataProvider.get(instance, serverPort, serverContextPath,
            managementContextPath, managementPort);

      if (metadata != null) {
         instance.setStatusPageUrl(metadata.getStatusPageUrl());
         instance.setHealthCheckUrl(metadata.getHealthCheckUrl());
         if (instance.isSecurePortEnabled()) {
            instance.setSecureHealthCheckUrl(metadata.getSecureHealthCheckUrl());
         }
         Map<String, String> metadataMap = instance.getMetadataMap();
         metadataMap.computeIfAbsent("management.port", k -> String.valueOf(metadata.getManagementPort()));
      }
      else {
         if (StringUtils.hasText(managementContextPath)) {
            instance.setHealthCheckUrlPath(managementContextPath + instance.getHealthCheckUrlPath());
            instance.setStatusPageUrlPath(managementContextPath + instance.getStatusPageUrlPath());
         }
      }

      setupJmxPort(instance, jmxPort);
      return instance;
   }

   private void setupJmxPort(EurekaInstanceConfigBean instance, Integer jmxPort) {
      Map<String, String> metadataMap = instance.getMetadataMap();
      if (metadataMap.get("jmx.port") == null && jmxPort != null) {
         metadataMap.put("jmx.port", String.valueOf(jmxPort));
      }
   }

   @Bean
   public EurekaServiceRegistry eurekaServiceRegistry() {
      return new EurekaServiceRegistry();
   }

   @Bean
   @ConditionalOnBean(AutoServiceRegistrationProperties.class)
   @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
   public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context,
         EurekaServiceRegistry registry, EurekaRegistration registration) {
      return new EurekaAutoServiceRegistration(context, registry, registration);
   }

   @Configuration(proxyBeanMethods = false)
   @ConditionalOnMissingRefreshScope
   protected static class EurekaClientConfiguration {

      @Autowired
      private ApplicationContext context;

      @Autowired
      private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

      @Bean(destroyMethod = "shutdown")
      @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
      public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
         return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
      }

      @Bean
      @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
      public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
         InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
         return new ApplicationInfoManager(config, instanceInfo);
      }

      @Bean
      @ConditionalOnBean(AutoServiceRegistrationProperties.class)
      @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
      public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
            @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
         return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
               .with(healthCheckHandler).build();
      }

   }

   @Configuration(proxyBeanMethods = false)
   @ConditionalOnRefreshScope
   protected static class RefreshableEurekaClientConfiguration {

      @Autowired
      private ApplicationContext context;

      @Autowired
      private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

      @Bean(destroyMethod = "shutdown")
      @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
      @org.springframework.cloud.context.config.annotation.RefreshScope
      @Lazy
      public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
            EurekaInstanceConfig instance, @Autowired(required = false) HealthCheckHandler healthCheckHandler) {
         ApplicationInfoManager appManager;
         if (AopUtils.isAopProxy(manager)) {
            appManager = ProxyUtils.getTargetObject(manager);
         }
         else {
            appManager = manager;
         }
         CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
               this.context);
         cloudEurekaClient.registerHealthCheck(healthCheckHandler);
         return cloudEurekaClient;
      }

      @Bean
      @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
      @org.springframework.cloud.context.config.annotation.RefreshScope
      @Lazy
      public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
         InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
         return new ApplicationInfoManager(config, instanceInfo);
      }

      @Bean
      @org.springframework.cloud.context.config.annotation.RefreshScope
      @ConditionalOnBean(AutoServiceRegistrationProperties.class)
      @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
      public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
            CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager,
            @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
         return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager).with(eurekaClient)
               .with(healthCheckHandler).build();
      }
   }

   @Target({ ElementType.TYPE, ElementType.METHOD })
   @Retention(RetentionPolicy.RUNTIME)
   @Documented
   @Conditional(OnMissingRefreshScopeCondition.class)
   @interface ConditionalOnMissingRefreshScope {
   }

   @Target({ ElementType.TYPE, ElementType.METHOD })
   @Retention(RetentionPolicy.RUNTIME)
   @Documented
   @ConditionalOnClass(RefreshScope.class)
   @ConditionalOnBean(RefreshAutoConfiguration.class)
   @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "true", matchIfMissing = true)
   @interface ConditionalOnRefreshScope {
   }

   private static class OnMissingRefreshScopeCondition extends AnyNestedCondition {

      OnMissingRefreshScopeCondition() {
         super(ConfigurationPhase.REGISTER_BEAN);
      }

      @ConditionalOnMissingClass("org.springframework.cloud.context.scope.refresh.RefreshScope")
      static class MissingClass {
      }

      @ConditionalOnMissingBean(RefreshAutoConfiguration.class)
      static class MissingScope {
      }

      @ConditionalOnProperty(value = "eureka.client.refresh.enable", havingValue = "false")
      static class OnPropertyDisabled {
      }
   }

   @Configuration(proxyBeanMethods = false)
   @ConditionalOnClass(Health.class)
   protected static class EurekaHealthIndicatorConfiguration {
      @Bean
      @ConditionalOnMissingBean
      @ConditionalOnEnabledHealthIndicator("eureka")
      public EurekaHealthIndicator eurekaHealthIndicator(EurekaClient eurekaClient,
            EurekaInstanceConfig instanceConfig, EurekaClientConfig clientConfig) {
         return new EurekaHealthIndicator(eurekaClient, instanceConfig, clientConfig);
      }
   }
}

总结以上的信息,从EurekaClientAutoConfiguration等方面可罗列出如下几个比较重要的类,如下:

类名

介绍与作用

EurekaClientConfig

封装了Eureka Client 与 Eureka Server 交互时所需要的配置信息,Spring Cloud 为其提供了默认配置类: EurekaClientConfigBean。

ApplicationInfoManager

作为应用信息管理器,管理服务实例类 Instancenfo 和服务实例配置信息类EurekaInstanceConfig。

InstanceInfo

封装了将被发送到 Eureka Server 进行服务注册的服务实例元数据,它在Eureka 注册表中代表着一个服务实例,其他服务可通过 InstanceInfo来了解该服务实例的相关信息,从而进行相关操作。

EurekaInstanceConfig

封装了 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo,通常这些信息在配置文件的 eureka.instance 前缀下进行设置,Spring Cloud 通过 EurekaInstanceBean 配置类提供默认配置。

DiscoveryClient

Spring Cloud中定义用来做服务发现的客户端接口。

上面的信息中 DiscoveryClient 是很重要的信息,毕竟它是 Spring Cloud 的顶级接口:

public interface DiscoveryClient extends Ordered {

   int DEFAULT_ORDER = 0;
   // 获取实现类描述
   String description();
   
   // 通过服务 ID 获取服务实例信息
   List<ServiceInstance> getInstances(String serviceId);

   // 获取服务实例 ID
   List<String> getServices();

   default void probe() {
      getServices();
   }

   @Override
   default int getOrder() {
      return DEFAULT_ORDER;
   }
}

上面有提到 EurekaDiscoveryClient 类中组合了 EurekaClient 接口,而它的实现类是 DiscoveryClient ,CloudEurekaClient又继承了 DiscoveryClient,这里 DiscoveryClient 类中提供了服务的注册、续约、下线和获取注册信息等功能,而它们的UML关系图如下:

二、DiscoveryClient类的解析

1、DiscoveryClient 作用

DiscoveryClient 是Eureka Client 的核心类,其作用与下:

  • 注册实例到 Eureka Server 中
  • 发送心跳更新与 Eureka Server 的续约
  • 在服务关闭时取消与 Eureka Server 的续约,完成服务下限
  • 获取在 Eureka Server 中的服务实例列表

2、DiscoveryClient 的类结构

可以先看下 DiscoveryClient 的类结构图:

从类结构图上可以看出 DiscoveryClient 类实现了 EurekaCient,EurekaCient 又继承了LookupService,前面已介绍过EurekaCient,这里不再赘述,这里看看 LookupService 类:

public interface LookupService<T> {
    // 根据服务实例名称获取 Application
    Application getApplication(String appName);
    // 获取当前注册表中所有的服务实例信息
    Applications getApplications();
    // 根据服务实例 Id 获取服务实例信息
    List<InstanceInfo> getInstancesById(String id);

    InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}

Application 是持有服务实例信息列表,它表示同一个服务的集群信息,这些服务实例乃是挂载在同一个服务名 appName 之下,而 InstanceInfo 则是代表着一个服务实例的信息,Application 类代码如下:

public class Application {
    
    private static Random shuffleRandom = new Random();
    // 服务名
    private String name;
    // 标识服务状态
    @XStreamOmitField
    private volatile boolean isDirty = false;

    @XStreamImplicit
    private final Set<InstanceInfo> instances;

    private final AtomicReference<List<InstanceInfo>> shuffledInstances;

    private final Map<String, InstanceInfo> instancesMap;
    
    // ........
}

在 Application 中对 InstanceInfo 的操作都是同步的,为的是保证其原子性。Applications 则是注册表中所有服务实例的集合,其间的操作也都是同步的。EurekaClient 继承了 LookupService 接口,为 DiscoveryClient 提供一个上层接口,其目的是为了Eureka1.0x 到 Eureka2.x 的升级做过渡渡。

EurekaCient 接口在 LookupService 的基础上提供了更丰富的方法,譬如:

  • 提供做种方式获取 InstanceInfo,例如根据区域、Eureka Server 地址获取等。
  • 提供本地客户端(区域、可用区)的数据,这部分与 AWS 相关
  • 提供了为客户端注册和获取健康检查处理器的功能

除了相关查询接口外,EurekaClient 提供以下的两个方法,需颇多关注:

public interface EurekaClient extends LookupService {
    // .......
    // 为 Eureka Client 注册健康处理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
    // 监听 Client 服务实例信息的更新
    public void registerEventListener(EurekaEventListener eventListener);
}

在 Eureka Server 中一般是通过心跳来识别一个实例的状态,而在 Eureka Client 中泽存在一个定时任务定时通过 HealthCheckHandler 检测当前 Client 的状态,当 其状态发生变化的时候,将会触发新的注册事件,更新 Eureka Server 的注册表中的相关实例信息。

3、DiscoveryClient 构造函数

在 DiscoveryClient 的构造函数中,会有如下操作,如:服注册表信息、服务注册、初始化发送心跳、缓存刷新、注册定时任务等。因此 DiscoveryClient 的构造函数贯穿了 Eureka Client 启动阶段的各项任务。

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    // 省略相关信息
}

在DiscoveryClient 的构造函数中有如下几个参数:ApplicationInfoManager、EurekaClientConfig、AbstractDiscoveryClientOptionalArgs、Provider<BackupRegistry>、EndpointRandomizer。前两个参数前面已做介绍,AbstractDiscoveryClientOptionalArgs 用于注入一些可选参数,BackupRegistry则充当备份注册中心的职责,EndpointRandomizer 则是作为端点随机器。对DiscoveryClient 的构造函数的职责做一个简单概括:

  • 相关配置赋值,如ApplicationInfoManager、EurekaClientConfig等
  • 备份注册中心初始化,默认没有实现
  • 拉去 Eureka Server 注册表信息
  • 注册前预处理
  • 向 Eureka Server 注册自身
  • 初始化定时任务、缓存刷新、按需注册定时任务

后面将会对这些步骤中对重要点进行相关分析。

三、拉取注册信息

1、关于DiscoveryClient#fetchRegistry方法

在DiscoveryClient 的构造函数调用了DiscoveryClient#fetchRegistry 方法,其方法作用是从 Eureka Server 中拉去注册表信息,方法代码如下:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 如果增量拉去被禁止或者 Applications 为 null,则进行全量拉取
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            // 省略日志打印部分
            // 全量拉取注册表信息
            getAndStoreFullRegistry();
        } else {
            // 增量拉去注册表信息
            getAndUpdateDelta(applications);
        }
        // 计算应用一致性哈希码
        applications.setAppsHashCode(applications.getReconcileHashCode());
        // 打印注册表上所有服务实例的总数量
        logTotalInstances();
    } catch (Throwable e) {
        logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
                appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // 更新远程实例状态之前推送缓存刷新事件,但是 Eureka 中并没有提供默认事件监听器
    onCacheRefreshed();

    // 基于缓存中被刷新的数据更新远程实例状态
    updateInstanceRemoteStatus();

    // 如果拉取成功,则返回 true
    return true;
}

在 Eureka 客户端,一般第一次全量拉取注册表信息之后,之后一般都只会尝试增量拉取,下面将解析一下全量拉取和增量拉取。

2、全量拉取注册表信息

一般只有第一次拉取的时候才会进行全量拉取,其调用方法是DiscoveryClient#getAndStoreFullRegistry,代码如下:

private void getAndStoreFullRegistry() throws Throwable {
    // 拉取注册表信息的版本,以此防止拉取版本落后(由其他线程引起)
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");
    Applications apps = null;
    
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    // 获取成功
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    // 检查 fetchRegistryGeneration 的更新版本是否发生变化,如果没有的话,则说明本次拉取的是最新的    
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 从 apps 中筛选状态为 UP 的实例,同时打乱实例的顺序防止同一个服务的不同实例在启动时接收流量
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

3、增量拉取注册表信息

增量拉取注册表信息,一般都发生在第一次之后,拉取的信息定为某一段时间之后发生的所有变更信息,通常是3分钟之内注册表信息发生变化。在获取到新的数据之后,会根据增量更新的数据对本地注册表进行更新。和全量拉取一样,也是通过fetchRegistryGeneration对更新的版本进行控制。增量拉取式为了保持Client 端与 Server 端注册表信息的数据一致性。Client中有一个注册表缓存刷新器,用来专门负责定时维护两者之间信息的同步。但是当增量拉取出现意外时,定时器将会全量拉取来更新本地缓存的注册表信息,具体代码如下:

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
    
    // 增量拉取失败
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        // 进行全量拉取
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 更新本地缓存
                updateDelta(delta);
                // 计算应用一致性哈希码
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        // 比较应用集合一致性哈希码,如果不一致将任务本次拉取的数据为脏数据,将发进行全量拉取并更新到本地
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

当增量拉取成功后,将会调用DiscoveryClient#updateDelta方法,在本方法中将会涉及ActionType类,此类是定义实例状态,而在DiscoveryClient#updateDelta方法中会根据InstanceInfo.ActionType状态的不同,来做不同的操作,DiscoveryClient#updateDelta代码如下:

private void updateDelta(Applications delta) {
    int deltaCount = 0;
    for (Application app : delta.getRegisteredApplications()) {
        for (InstanceInfo instance : app.getInstances()) {
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
            // 若状态为 ADDED,则表示添加 Eureka Server
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    // 添加到本地实例
                    applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            // 若状态为 MODIFIED,则表明 Eureka Server 中的注册表信息发生改变
            } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Modified instance {} to the existing apps ", instance.getId());
                // 添加到本地实例
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            // 若状态为 DELETED,则表明服务实例已从 Eureka Server 中被剔除
            } else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    // 从本地注册表中剔除
                    existingApp.removeInstance(instance);
                    /*
                     * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                     * if instance list is empty, we remove the application.
                     */
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
    logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

    getApplications().setVersion(delta.getVersion());
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

    for (Applications applications : remoteRegionVsApps.values()) {
        applications.setVersion(delta.getVersion());
        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    }
}

四、服务注册

当拉取完Eureka Server中到注册表信息并缓存到本地后,Eureka Client 将向 Eureka Server 中注册自身的服务实例元数据,调用方法为 DiscoveryClient#register ,其代码如下:

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

当 EurekaHttpClient#register 调用成功后,httpResponse返回状态码则为 204。

五、初始化定时任务

服务注册其实是一个持续的过程,Eureka Client 需定时发送心跳的方式与 Eureka Server 进行通信,来维持在 Eureka Server 上的租约。同时 Eureka Server 注册表中的服务实例信息是动态变化的,为了保持 Eureka Client 与Eureka Server的注册表信息一直,Eureka Client 需定时向 Eureka Server拉取注册表信息并缓存至本地。为了监测 Eureka Client 应用信息和状态变化,Eureka Client 设置了一个定时器,定时检查应用信息和状态变化,当发生变化时向 Eureka Server 重新注册,避免注册表信息与本地不一致,避免注册表中的信息不可用。

在 DiscoveryClient#initScheduledTasks 方法中初始化了三个定时任务,分别为:用于向 Eureka Client 发送心跳、向 Eureka Server 拉取注册表信息并刷新本地缓存、用于按需注册的操作,代码如下:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // 注册表刷新定时器
        // 获取配置文件中的刷新间隔,默认为 30s,可以通过 eureka.client.registry-fetch-interval-seconds进行设置
        cacheRefreshTask = new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        );
        scheduler.schedule(
                cacheRefreshTask,
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }
    // 如果需要注册
    if (clientConfig.shouldRegisterWithEureka()) {
        // 发送心跳定时器,默认 30s发送一次心跳
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
        // 心跳定时器
        // Heartbeat timer
        heartbeatTask = new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
        );
        scheduler.schedule(
                heartbeatTask,
                renewalIntervalInSecs, TimeUnit.SECONDS);
        // 按需注册定时器
        // InstanceInfo replicator
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                logger.info("Saw local status change event {}", statusChangeEvent);
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

1、刷新缓存定时任务和发送心跳定时任务

在 DiscoveryClient#initScheduledTasks 方法中,通过ScheduledExecutorService#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit) 的方式提交缓存刷新、发送心跳任务,任务执行的方式为延时执行并且不循环,这两个定时任务循环逻辑由 TimedSupervisorTask 提供实现。TimedSupervisorTask 继承了TimeTask,提供执行定时任务的功能,它的主要逻辑在其 run 方法中,代码如下:

public class TimedSupervisorTask extends TimerTask {
    
    // 省略部分代码
    
    @Override
    public void run() {
        Future<?> future = null;
        try {
             // 执行任务
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 等待执行结果
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            // 执行完成,设置下次执行任务的频率(时间间隔)
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            // 执行超时
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            // 设置下次执行任务的频率(时间间隔)
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
            // 执行任务被拒绝,并统计被拒绝的次数
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }
            
            throwableCounter.increment();
        } finally {
            // 取消未结束任务
            if (future != null) {
                future.cancel(true);
            }
            // 如果定时任务服务关闭,定义下一次任务
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
    // 省略部分代码
}

run方法的调用逻辑大致如下:

  1. scheduler 初始化并延迟执行 TimedSupervisorTask
  2. TimedSupervisorTask 将 task 提交 executor 中执行,task 和 executor 在初始化TimedSupervisorTask传入
    1. 若 task 执行正常 TimedSupervisorTask 将自己提交到 scheduler 中,延迟 delay时间间隔后再执行
    2. 若 task 执行超时,计算新的 delay 时间(不超过 maxDelay),然后 TimedSupervisorTask 将自己提交到scheduler 中,延迟 delay时间间隔后再执行

TimedSupervisorTask 通过这种不断循环提交任务的方式,完成定时任务的要求。

在 DiscoveryClient#initScheduledTasks 中提交缓存刷新定时任务为 CacheRefreshThread线程,提交发送心跳定时任务线程为 HeartbeatThread线程。CacheRefreshThread 继承 Runnable接口,其主要代码如下:

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

@VisibleForTesting
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {
                        logger.info("Remote regions to fetch modified concurrently," +
                                " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                    }
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }
        // 判断远程 Regions 是否有变化(即 Eureka Server 地址是否发生变化)决定进行全量拉取还是增量拉取
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }
        // 打印更新注册缓存后的变化
        if (logger.isDebugEnabled()) {
            StringBuilder allAppsHashCodes = new StringBuilder();
            allAppsHashCodes.append("Local region apps hashcode: ");
            allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
            allAppsHashCodes.append(", is fetching remote regions? ");
            allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
            for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                allAppsHashCodes.append(", Remote region: ");
                allAppsHashCodes.append(entry.getKey());
                allAppsHashCodes.append(" , apps hashcode: ");
                allAppsHashCodes.append(entry.getValue().getAppsHashCode());
            }
            logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                    allAppsHashCodes);
        }
    } catch (Throwable e) {
        logger.error("Cannot fetch registry from server", e);
    }
}

CacheRefreshThread 线程任务将委托 DiscoveryClient#fetchRegistry 方法进行缓存刷新的具体操作。

HeartbeatThread 同样继承 Runnable 接口,给任务向 Eureka Server 发送心跳请求,维持 Eureka Client 在注册表中的租约,代码如下:

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // 调用 HTTP 发送心跳到 Eureka Server 中维持租约
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        // Eureka Server 中不存在应该应用实例
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        // 续约成功
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

Eureka Server 会根据续约提交到 appName 与 instanceInfoId 来更新注册表中的服务实例的租约。当注册表中不存在该实例时,返回 404 状态码,发送心跳请求的 Eureka Client 在接收到 404 状态后将会重新发起注册,如果续约成功,才会返回200状态码。

2、按需注册定时任务

按需注册定时任务的作用时将 Eureka Client 中的 InstanceInfo 或者 status发生变化时,重新向Eureka Server 发起注册请求,更新注册表中的实例信息,保证 Eureka Server 注册表中的实例信息总是有效和可用的。代码如下:

// InstanceInfo replicator
// 定时检查刷新服务实例信息,检查是否有变化,是否需要重新注册
instanceInfoReplicator = new InstanceInfoReplicator(
        this,
        instanceInfo,
        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
        2); // burstSize

// 监控应用的 status 变化,发生变化即可发起重新注册
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        logger.info("Saw local status change event {}", statusChangeEvent);
        instanceInfoReplicator.onDemandUpdate();
    }
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    // 注册应用状态改变监控器
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动定时按需注册定时任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

按需注册的代码罗基分为两部分,一部分是定义了一个定时任务,定时刷新服务实例的信息和检查应用状态的变化,在服务实例信息发生变化的情况下,向 Eureka Server 重新发起注册操作。另一部分则是注册了状态改变监控器,在应用状态发生变化时,刷新服务实例信息,在服务实例信息发生改变的情况下向 Eureka Server 重新发起注册操作。InstanceInfoReplicator中的定时任务逻辑位于其run方法中,代码如下:

public void run() {
    try {
        // 刷新了 InstanceInfo 中的服务实例信息
        discoveryClient.refreshInstanceInfo();
        // 如果数据发生改变,则返回其修改的时间
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            // 注册服务实例
            discoveryClient.register();
            // 重置更新状态
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        // 执行下一个定时任务
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

DiscoveryClient#refreshInstanceInfo 方法乃是刷新本地服务实例信息和检查服务状态的变化,代码如下:

void refreshInstanceInfo() {
    // 刷新服务实例信息
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    // 更新租约信息
    applicationInfoManager.refreshLeaseInfoIfRequired();

    InstanceStatus status;
    try {
        // 调用 HealthCheckHandler 检查服务实例的状态变化
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
        status = InstanceStatus.DOWN;
    }

    if (null != status) {
        applicationInfoManager.setInstanceStatus(status);
    }
}

run 方法首先调用了 DiscoveryClient#refreshInstanceInfo 方法刷新当前的服务实例信息,检查当前服务实例信息和服务状态是否发生变化,如果当前服务实例信息或服务状态发生变化,将会向Eureka Server 重新发起服务注册操作。最后声明下一个延时任务,用于再次调用 run 方法,继续检查服务实例信息和服务状态变化,在服务状态变化实例发生变化的情况下重新发起注册。

如果 Eureka Client 状态发生变化(在Spring Boot 通过 Actuator 对服务状态进行监控,具体实现为 EurekaHealthCheckHandler),注册在 ApplicationInfoManager 的状态改变监控器将会被触发,从而调用InstanceInfoReplicator#onDemandUpdate方法,检查服务实例信息和服务状态的变化,可能会引起按需注册任务,代码如下:

public boolean onDemandUpdate() {
    // 控制流量,当超过限制时,不能进行按需更新
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");

                    Future latestPeriodic = scheduledPeriodicRef.get();
                    // 取消上次 run 任务
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }

                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

InstanceInfoReplicator#onDemandUpdate 方法中调用 InstanceInfoReplicator#run 方法检查服务实例信息和服务状态的变化,并在服务实例信息和服务状态发生变化的情况下向 Eureka Server 发起重新注册的请求,为了防止重新执行 run 方法,onDemandUpdate 方法还会取消执行上次已经提交且未完成的 run方法,执行最新的按需注册任务。

六、服务下线

在应用服务关闭的时候,Eureka Client 会主动向 Eureka Server 注销自身在注册表中的信息。DiscoveryClient 中对象销毁前执行的清理方法代码如下:

public synchronized void shutdown() {
    // 同步方法
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");
        // 院子操作,确保只会执行一次
        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        // 取消定时任务
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
                && clientConfig.shouldRegisterWithEureka()
                && clientConfig.shouldUnregisterOnShutdown()) {
            // 服务下线
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 关闭 Jerry 客户端
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
        // 关闭相关 Monitor
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}

在销毁 DiscoveryClient 前,会进行一系列清理工作,包括注销 ApplicationInfoManager 中的 StatusChangeListener、取消定时任务、服务下线方法,其代码如下:

void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}

再可看 AbstractJerseyEurekaHttpClient#cancel 方法中,可以发现服务下线调用的接口以及传递的参数,代码如下:

@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP DELETE {}{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-18 11:15:08  更:2021-11-18 11:15:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 21:56:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码