IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Eureka 由浅入深解读,7W+篇幅,再也不想学下去了 -> 正文阅读

[Java知识库]Eureka 由浅入深解读,7W+篇幅,再也不想学下去了

  • J3 - 白起
  • 技术 # 笔记 # 小白学微服务 # Eureka

上篇我对微服务方案实现的其中之一SpringCloud进行了介绍,相信大家已经对它有个大致的了解了。👉《小白学微服务》之,什么是Spring Cloud Netflix

比如,SpringCloud不是一个技术,而是一个技术的集合体;再比如SpringCloud组件中的各个技术都是以SpringBoot为基础实现的等。

那既然知道了这些,我就开始介绍SpringCloud中的第一个组件了-Eureka

一、是什么

SpringCloud Eureka 是 SpringCloud Netflix 微服务套件的一部分,基于 Netflix Eureka 做了二次封装,主要负责实现微服务架构中的服务治理功能

SpringCloud Eureka 是一个基于 REST 的服务,并且提供了基于 Java 的客户端组件,能够非常方便地将服务注册到 SpringCloud Eureka 中进行统一管理。

在微服务架构体系中,服务治理是必不可少的。在服务治理中,服务注册中心是一定要有的,而除了用 Eureka 作为注册中心之外,还可以用,Consul、Etcd、Zookeeper 等来作为服务的注册中心。

当然,在常见的可以作为服务注册中心的组件中,Zookeeper 和 Eureka 一样都是使用的比较多的。那为什么SpringCloud不用 Zookeeper 当作注册中心而选用 Eureka呢!

这就不得不提一个在分布式领域中的著名定理CAP了。

1.1 CAP定理

CAP定理又称CAP原则,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),最多只能同时满足三个特性中的两个,三者不可兼得。

在这里插入图片描述

  • C:Consisteny(一致性),比如数据库是主从模式,一个写库请求进来了,master库完成了写入操作,但是再slave同步数据之前,另一个用户查了这条数据,结果没查到,但是也没报错,这就不是强一致性。虽然最终会同步成功,但这是最终一致性的体现。强一致性的体现在于我不管你因为什么没同步成功(可能网络延迟或其他等),只要没同步成功,我这个slave就不能对外提供服务。必须主从数据一致才可以提供服务。(很少有做到这点的)
  • A:Availability(可用性),还是上面的例子,就是保证了可用性。因为虽然主从没同步完成,但是我从库照样能提供服务而且及时响应结果。也就是说可用性保证服务可用,而不在乎数据是否一致。明显和C是冲突的,那CA怎么还能组合到一起?其实是可以的(单机部署)。
  • P:Partition Tolerance(分区容错性),集群部署了三台服务。挂了一台,其他两台还能继续对外提供服务,这时候我就认为他是没问题的,也就是我能容忍你挂了一台,只要还有服务能对外提供请求即可。所以一般分区容忍性是必须的,一般都需要从C和A之间做选择。

那么现在两两组合的话就有下面三种情况:

  1. CP:满足一致性和分区容忍性,常见的就是 Zookeeper 集群。

  2. AP:满足可用性和分区容忍性,创建的就是 Eureka 集群。

  3. CA:满足一致性和可用性,适用单机部署系统,扩展性不强。

至于在分布式微服务系统中如何抉择,就没有很好的定论了。所以在设计系统架构的时候,我们应该根据系统具体的业务场景来权衡CAP。

只有适合的才是最好的,切不可为了追求完美而浪费太多的时间精力在CAP的抉择上。

更多CAP内容可看扩展阅读一1

1.2 为什么SpringCloud注册中心选择Eureka

根据上面的CAP理论,我们知道 zookeeper 和 Eureka 在设计的时候就已经对分布式场景作出了一些取舍。

zookeeper的CP保证

当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟以前的注册信息,但不能接受服务直接down掉不可用。也就是说,服务注册功能对可用性的要求要高于一致性。但是zk会出现这样一种情况,当master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s,且选举期间整个zk集群都是不可用的,这就导致在选举期间注册服务瘫痪。在云部署的环境下,因网络问题使得zk集群失去master节点是较大概率会发生的事,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。

Eureka的AP保证

Eureka看明白了这一点,因此在设计时就优先保证可用性。Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时如果发现连接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。除此之外,Eureka还有一种自我保护机制,如果在15分钟内超过85%的节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,此时会出现以下几种情况:

  1. Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务
  2. Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)
  3. 当网络稳定时,当前实例新的注册信息会被同步到其它节点中

结论:Eureka可以很好的应对因网络故障导致部分节点失去联系的情况,而不会像Zookeeper那样使整个注册服务瘫痪。

1.3 Eureka架构

Eureka1.x版本架构图

在这里插入图片描述

从架构图中我先来说一下Eureka的分区

Eureka 提供了 Region(区域) 和 Availability Zone(可用区) 两个概念来进行分区,这两个概念均来自于亚马逊的 AWS:

region:可以理解为地理上的不同区域,比如亚洲地区,中国区或者深圳等等。没有具体大小的限制,根据项目具体的情况,可以自行合理划分 region。图中us-east-1c、us-east-1d、us-east-1e就是一个个不同的区域。

Availability Zone:可以简单理解为 region 内的具体机房,比如说 region 划分为深圳,然后深圳有两个机房,就可以在此 region 之下划分出 zone1、zone2 两个 zone。

了解分区之后,可以知道 Eureka 之间的行为非常之多,具体如下:

  • Application Service:由名字可知,它是一个服务的提供者,同时也是一个Eureka Client。作用,扮演服务提供角色,提供业务服务,向 Eureka Server 注册和更新自己的信息,同时能从 Eureka Server 注册表中获取到其它服务信息。
  • Eureka Server:扮演服务注册中心的角色,提供服务注册和发现的功能。每个 Eureka Client 向 Eureka Server 注册自己的信息,也可以通过 Eureka 获取到其它服务的信息达到发现和调用其它服务的目的。
  • Application Client:是一个 Eureka Client ,扮演服务消费者的角色,通过 Eureka Server 获取注册到其上的其它服务信息,从而根据信息找到所需的服务发起远程调用。
  • Replicate:Eureka Server 之间注册表信息的同步复制,使 Eureka Server 集群中不同注册表中的服务实例信息保持一致。
  • Make Remote Call:服务之间的远程调用。
  • Register:注册服务实例,Client 端向 Server 端注册自身的元数据以提供服务发现。
  • Renew:续约,通过发送心跳到 Server 以维持和更新注册表中服务实例元数据的有效性。当在一定时长内,Server 没有收到 Client 的心跳信息,将默认服务下线,会把服务实例信息从注册表中删除。
  • Cancel:服务下线,Client 再关闭时主动向 Server 注销服务实例元数据,这是 Client 的服务实例数据信息将从注册表中删除。
  • Get Registry:获取注册表,Client 向 Server 请求注册表信息,用于服务发现,从而发起服务之间的远程调用。

这些行为在 Eureka 中发挥着重要的作用,那了解了以上的这些内容相信Eureka中各位的脑海中有了一个大致的轮廓的。比如Eureka是一个服务治理框架,用于服务注册发现,强调AP、用分区的架构来实现集群等。

二、基础应用

好记性不然烂笔头,虽然用在这里不恰当,但明白我的意思就行,讲了很多,下面就来实操搭建一下Eureka的Server,Client端应用程序。

一点说明

  • SpringCloud版本:Finchley.SR3

  • SpringBoot版本:2.0.3.RELEASE

  • 所有项目都是基于SpringBoot开发

2.1 项目基础模块规划

因为 Eureka 是 SpringCloud 的一个组件,所以前期肯定是搭建一个微服务项目,而对于微服务项目,依照本人的搭建习惯,会将项目的目录结构做如下调整:

spring-cloud-study(父工程)
|
----server(业务模块)
	----server-user(业务具体模块)
	----server-XXXX
|
----spring-cloud(cloud模块)
	----spring-cloud-eureka(cloud的各个组件)
	----spring-cloud-config
	----spring-cloud-XXXX

既然目录结构清晰了,那先创建父工程,业务模块,cloud 模块出来

在这里插入图片描述

1)父工程 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.baiqi</groupId>
    <artifactId>springcloud-study</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>pom</packaging>

    <!--父工程名称-->
    <name>springcloud-study</name>

    <!--聚合子模块-->
    <modules>
        <module>spring-cloud</module>
        <module>server</module>
    </modules>

    <!--版本控制-->
    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.SR3</spring-cloud.version>
    </properties>

    <!--版本管理-->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!--web启动器-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2)业务模块 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!--父级依赖-->
    <parent>
        <artifactId>springcloud-study</artifactId>
        <groupId>cn.baiqi</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>server</name>
    <!--打包方式:pom-->
    <packaging>pom</packaging>

    <!--业务模块下的子业务模块-->
    <modules>
        
    </modules>
    
    <dependencies>
        <!-- Eureka客户端 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3)cloud 模块 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!--父级依赖-->
    <parent>
        <artifactId>springcloud-study</artifactId>
        <groupId>cn.baiqi</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>spring-cloud</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-cloud</name>
    <!--打包方式:pom-->
    <packaging>pom</packaging>

    <!--cloud模块下的子组件模块-->
    <modules>

    </modules>

    <dependencies>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

到此项目的架子算是搭建起来了,创建聚合工程的时候我们要注意如下几点

  1. 父级项目只做聚合,不写任何代码,打包方式为 pom
  2. 父级项目的 pom 文件中要聚合子项目将其下的子项目聚合到<modules>标签中
  3. 子模块的<parent>依赖要写父级坐标,打包方式为 jar

现在可以往项目中添加一个个的组件模块了。

2.2 Eureka注册中心搭建

首先我们在 spring-cloud 模块中添加一个子模块,用来当作 Eureka 注册中心,子模块名:spring-cloud-eureka

1)创建 SpringBoot 项目,目录结构如下:

在这里插入图片描述

2)编写 pom 文件,并导入 Eureka 服务端依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!--父项目依赖-->
    <parent>
        <artifactId>spring-cloud</artifactId>
        <groupId>cn.baiqi</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>spring-cloud-eureka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <!--模块名-->
    <name>spring-cloud-eureka</name>

    <dependencies>
        <!--Eureka服务端依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3)编写配置文件:application.yml

server:
  port: 9000 #端口
spring:
  application:
    name: spring-cloud-eureka #应用名称
eureka:
  client:
    service-url:
      defaultZone: http://localhost:9000/eureka #暴露的服务注册中心地址,用于Server和Client之间交互
    register-with-eureka: false #因为本服务是服务注册中心,所以不需要注册自己
    fetch-registry: false #不需要向注册中心获取注册表信息

4)启动类上添加@EnableEurekaServer注解

@EnableEurekaServer // 开启Eureka服务端
@SpringBootApplication
public class SpringCloudEurekaApplication {
    public static void main(String[] args) {
        SpringApplication.run(SpringCloudEurekaApplication.class, args);
    }
}

5)启动 Eureka 注册中心服务,访问:http://localhost:9000/,看到如下页面即注册中心搭建完成。

在这里插入图片描述

注册中心搭建好之后,其它服务就可以往这个注册中心注册服务了,按照习惯我准备了两个业务服务:

  1. server-user(调用者)
  2. server-order(提供者)

下面分别搭建一下把。

2.3 Eureka Client搭建-服务提供者

业务服务名:server-order

1)创建 SpringBoot 项目,目录结构如下:

在这里插入图片描述

2)因为 server-order 的父项目是 server ,所以只需在 server 中的 pom 里添加 Eureka 的客户端依赖,这样只要是 server 下面的项目,都会因为依赖传递而获取到相关的依赖,记住别忘了将子项目聚合到父项目中(<modules>标签里)

server-order 的 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
        <!--父级依赖-->
    <parent>
        <artifactId>server</artifactId>
        <groupId>cn.baiqi</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>server-order</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>server-order</name>
    
    <dependencies>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

server 的 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <!--父级依赖-->
    <parent>
        <artifactId>springcloud-study</artifactId>
        <groupId>cn.baiqi</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>server</name>
    <!--打包方式:pom-->
    <packaging>pom</packaging>

    <!--业务模块下的子业务模块-->
    <modules>
        <module>server-user</module>
        <module>server-order</module>
    </modules>

    <dependencies>
        <!-- Eureka客户端 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

3)配置文件:application.yml

server:
  port: 9001 #端口
eureka:
  instance:
    lease-renewal-interval-in-seconds: 2 #每间隔2s,向服务端发送一次心跳,证明自己依然"存活"
    lease-expiration-duration-in-seconds: 10 #告诉服务端,如果我10s之内没有给你发心跳,就代表我故障了,将我踢出掉
    prefer-ip-address: true #告诉服务端,服务实例以IP作为链接,而不是取机器名
  client:
    service-url:
      defaultZone: http://localhost:9000/eureka #注册中心地址

spring:
  application:
    name: server-order #服务名

logging:
  level:
    org.springframework: debug #对应模块的日志级别

4)添加一个 Controller,对外提供调用

/**
 * @author J3-白起
 * @package cn.baiqi.serverorder.controller
 * @createTime 2021/5/7 - 14:38
 * @description 订单控制器
 */
@RestController
@RequestMapping("/order")
public class OrderController {

    @GetMapping("/getAll/{value}")
    public String getAll(@PathVariable String value) {
        return "order服务提供了相关数据:" + value;
    }

}

该接口被调用后会返回相应的数据

5)Eureka Client 端在启动类中不需要添加 @EnableXXXX 注解,只要添加了 Client 依赖,它就是一个 Eureka Client 应用,直接启动就行了。

2.4 Eureka Client搭建-服务调用者

业务服务名:server-user

1)创建 SpringBoot 项目,目录结构如下:

在这里插入图片描述

2)server-user 模块的 pom 文件同服务提供者一样,不赘述,server 的 pom 文件只需聚合子项目模块就行。

3)配置文件:application.yml

server:
  port: 9002 #端口
eureka:
  instance:
    lease-renewal-interval-in-seconds: 2 #每间隔2s,向服务端发送一次心跳,证明自己依然"存活"
    lease-expiration-duration-in-seconds: 10 #告诉服务端,如果我10s之内没有给你发心跳,就代表我故障了,将我踢出掉
    prefer-ip-address: true #告诉服务端,服务实例以IP作为链接,而不是取机器名
  client:
    service-url:
      defaultZone: http://localhost:9000/eureka #注册中心地址

spring:
  application:
    name: server-user #服务名

logging:
  level:
    org.springframework: debug #对应模块的日志级别

4)添加一个调用 Controller,获取相关数据

/**
 * @author J3-白起
 * @package cn.baiqi.serveruser.controller
 * @createTime 2021/5/7 - 14:46
 * @description 用户控制器
 */
@RestController
@RequestMapping("/user")
public class UserController {

    @Autowired
    private RestTemplate restTemplate;

    @GetMapping("/getUserOrder/{userId}")
    public String getUserOrder(@PathVariable Long userId) {
        // 调用地址
        String url = "http://server-order/order/getAll/{userId}";
        // 调用
        String body = restTemplate.getForEntity(url, String.class, 1L).getBody();
        return body;
    }

    /**
     * 注入一个负载均衡的 restTemplate 用于服务调用
     *
     * @return
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }

}

访问该接口就可以调用对应的服务提供者获得相关数据。

5)Eureka Client 端在启动类中不需要添加 @EnableXXXX 注解,只要添加了 Client 依赖,它就是一个 Eureka Client 应用,直接启动就行了。

2.5 启动测试

依次启动应用,访问 Eureka 的注册中心主页,界面如下:

在这里插入图片描述

访问:http://localhost:9002/user/getUserOrder/2,结果如下

order服务提供了相关数据:2

RestTemplate 将根据服务名 server-order 通过预先从 spring-cloud-eureka 缓存到本地的注册表中获取到 server-order 服务的具体地址 ,从而发起服务间调用。

三、Eureka Client 源码解析

通过上面的服务调用案例,我们基本上不用做过多的复杂操作,即可完成一个远程的服务调用。

所以 Eureka Client 为了简化开发人员的工作,将很多与 Eureka Server 交互的工作都隐藏起来了,自主完成。

下面看看各个阶段,Eureka Client 在后台自动完成的工作图:

在这里插入图片描述

对于 Eureka Client 而言,它没有在启动类上加 @EnableXXXX 注解依然可以正常使用其相关的功能,那么我们肯定就想到了 SpringBoot 的自动装配原理。

找到 Eureka Client 源码包中的 spring.factories 文件就可以知道在启动的时候,自动配置了那些功能。

在这里插入图片描述

在自动装配类中,我们关注下面三个配置类:

  1. EurekaClientAutoConfiguration:Eureka Client 自动配置类,负责Eureka Client 中关键Beans的配置和初始化,如ApplicationManager 和 EurekaClientConfig 等。
  2. RibbonEurekaAutoConfiguration:Ribbon 负载均衡相关配置。
  3. EurekaDiscoveryClientConfiguration:配置自动注册和应用的健康检测器。

3.1 读取应用自身配置

EurekaClientAutoConfiguration 配置类中定义了非常多的Bean,在 SpringBoot 启动的时候就会完成这些Bean的读取和配置,主要的配置如下。

类名作用
EurekaClientConfig封装 Eureka Client 与 Eureka Server 交互所需要的配置信息。SpringCloud 为其提供了一个默认配置类的 EurekaClientConfigBean,可以在配置文件中通过前缀 eureka.client+属性名进行属性覆盖。
ApplicationInfoManager作为应用信息管理器,管理服务实例的信息类 InstanceInfo 和服务实例的配置信息类 EurekaInstanceConfig。
InstanceInfo封装将被发送到 Eureka Server 进行服务注册和服务实例元数据。它在 Eureka Server 的注册表中代表一个服务实例,其他服务实例可以通过 InstanceInfo 了解该服务实例的相关信息从而发起服务请求。
EurekaInstanceConfig封装 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo 通常这些信息在配置文件中的 eureka.instance 前缀下进行设置,SpringCloud 通过 EruekaInstanceConfigBean 配置类提供默认配置。
DiscoveryClientSpringCloud 中定义用来服务发现的客户端接口。

EurekaClientAutoConfiguration 配置类代码:

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class,
                      CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class})
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
                            "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
                            "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
    // ......
}

看到这么多注解不要慌,有些不是我们的关注点,我们只需要关注下面几点就可:

关注点一:@ConditionalOnBean注解作用,该类在加载之前环境中必须要有EurekaClientConfig.classEurekaDiscoveryClientConfiguration.Marker.class两个类,否则不加载。

关注点二:@ConditionalOnProperty注解作用,配置文件中eureka.client.enabled属性的值为 ture 时,该配置类才生效,而该属性默认值就为true,所以该属性配置可以忽略。这也是我们不用在 Eureka 客户端程序的启动类上添加@EnableEurekaClient注解,程序依然生效的原因。

关注点三:@AutoConfigureBefore@AutoConfigureAfter注解作用,该类的加载的顺序为@AutoConfigureAfter注解中定义的配置类之后,然后再是该配置类,最后再是@AutoConfigureBefore注解中定义的类加载。

  • 加载顺序:@AutoConfigureAfter ==> EurekaClientAutoConfiguration ==> @AutoConfigureBefore

在加载@AutoConfigureAfter注解时,当加载到EurekaDiscoveryClientConfiguration后,EurekaClientAutoConfiguration配置类就满足了所有定义的条件,即生效了。

在这里插入图片描述

3.2 客户端发现

先来了解一下DiscoveryClient接口,和DiscoveryClient类。

DiscoveryClient 接口是 SpringCloud 框架提供的,主要为了扩展 Netflix 提供的 Eureka 客户端而提供的,该接口的实现类通过组合的方式引入了 Netflix 提供的 DiscoveryClient 类,然后进行了进一步封装,让开发者更加容易使用 SpringBoot 进行基于 Eureka 的开发。

DiscoveryClient 类是 Netflix 开源框架提供的,主要用于与Eureka服务端(即注册中心)进行交互。

DiscoveryClient接口和其默认实现类EurekaDiscoveryClient代码

DiscoveryClient.java

public interface DiscoveryClient {

    // 获取实现类的描述
    String description();

	// 通过服务id获取服务实例信息
    List<ServiceInstance> getInstances(String serviceId);

    // 获取所有服务实例id
    List<String> getServices();

}

EurekaDiscoveryClient.java # getInstances()

public class EurekaDiscoveryClient implements DiscoveryClient {

    // ......

    private final EurekaClient eurekaClient;

    // ......

    @Override
    public List<ServiceInstance> getInstances(String serviceId) {
        List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
                                                                              false);
        List<ServiceInstance> instances = new ArrayList<>();
        for (InstanceInfo info : infos) {
            instances.add(new EurekaServiceInstance(info));
        }
        return instances;
    }

    // ......
}

EurekaDiscoveryClient 实现了 DiscoveryClient 接口,通过 getInstances() 方法会发现,其中又调用了 Netflix 提供的 EurekaClient 接口这种组合形式的方式使得我们非常容易的进行 Eureka 开发。

在这里插入图片描述

通过组合形式,我们就将代码拉回到了 Netflix 包中。

EurekaClient 接口来自于 com.netflix.discovery 包中,默认实现为 com.netflix.discovery.DiscoveryClient 它属于 eureka-client的源代码。

DiscoveryClient 可以说时功能非常的强大,提供了 Eureka Client 注册到 Server上、续租、下线、及获取Server中注册表信息等诸多关键功能。

下面继续分析这个类。

3.3 DiscoveryClient类分析

DiscoveryClient 是 Eureka Client 的核心类,负责与 Eureka Server 交互的关键逻辑,具体功能如下:

  1. 注册服务实例到 Eureka Server 中
  2. 发送心跳更新与 Eureka Server 的续约
  3. 在服务关闭时从 Eureka Server 中取消续约,服务下线
  4. 查询在 Eureka Server 中注册的服务实例列表

上面有该类的类图👆。

DiscoveryClient 继承了 LookupService 接口, LookupService作用是发现活跃的服务实例, 主要方法如下:

public interface LookupService<T> {
    // 根据服务实例注册的appName来获取封装有相同appName的服务实例信息容器
    Application getApplication(String appName);
    // 返回当前注册表中所有的服务实例信息
    Applications getApplications();
    // 根据服务实例的 id 获取服务实例信息
    List<InstanceInfo> getInstancesById(String id);
    // ......
}
  • Application:持有服务实例信息列表,它可以理解成同一个服务的集群信息,这些服实例都挂在同一个服务名 appName 下。
  • Applications:表示注册表中所有服务实例信息的集合
  • InstanceInfo:表示一个服务实例信息。

ApplicationApplications类中,对InstanceInfo的操作都是同步操作,防止出错。

EurekaClientLookupService接口的基础上扩充了更多的接口,并提供了非常丰富的获取服务实例的方式,下面主要关注两个接口。

@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {
    
    // ......
    
    // 为Eureka Client注册健康检查处理器
    public void registerHealthCheck(HealthCheckHandler healthCheckHandler);

    // 为Eureka Client注册一个EurekaEventListener(事件监听器)监听client服务器实力信息的更新
    public void registerEventListener(EurekaEventListener eventListener);
    
    // ......
}

在 Eureka Server 中一般通过心跳(heartbeats)来识别一个实例的状态。Eureka Client 中存在一个定时任务定时通过 HealthCheckHandler 检测当前 Client 的状态,如果 Client 的状态发生改变,将会触发新的注册事件,更新 Eureka Server 的注册表中该服务的相关信息。

HealthCheckHandler代码如下:

public interface HealthCheckHandler {

    InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);

}

Eureka 中的事件监听模式属于观察者模式,事间监听器将监听 Client 的服务实例信息变化,触发对应的处理事件,EurekaEvent类图如下。

在这里插入图片描述

3.4 DiscoveryClient构造函数

咱们再回到EurekaClientAutoConfiguration类中。

当程序加载该类的时候,关注如下代码:

public class EurekaClientAutoConfiguration {
    // ......
		@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		@org.springframework.cloud.context.config.annotation.RefreshScope
		@Lazy
		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance,
				@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
			//If we use the proxy of the ApplicationInfoManager we could run into a problem
			//when shutdown is called on the CloudEurekaClient where the ApplicationInfoManager bean is
			//requested but wont be allowed because we are shutting down.  To avoid this we use the
			//object directly.
			ApplicationInfoManager appManager;
			if (AopUtils.isAopProxy(manager)) {
				appManager = ProxyUtils.getTargetObject(manager);
			}
			else {
				appManager = manager;
			}
            // 创建Eureka Client
			CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
					this.context);
			cloudEurekaClient.registerHealthCheck(healthCheckHandler);
			return cloudEurekaClient;
		}
    // ......
}

该类加载时,Eureka Client 的初始化入口就是从这里开始进入的,具体的说,可以是下面这都段代码:

// 创建Eureka Client
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
                                                            this.context);

进入 CloudEurekaClient 类的构造器,可以确认最终它调用的是 DiscoveryClient类的构造器。

DiscoveryClient 构造器所做的功能非常多,包含:

  1. Eureka Client 从 Eureka Server 中拉取注册表信息
  2. 服务注册
  3. 初始化发送心跳
  4. 缓存刷新(重新拉去注册表信息)
  5. 按需注册定时任务

上面所列的功能,可以说是贯穿了 Eureka Client 启动阶段的各项工作。

构造器代码如下:

@Inject // java自带的自动注入注解
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    // ......
}

简单介绍一下构造参数:

  1. ApplicationInfoManager:上面提到过,它是应用信息管理器。
  2. EurekaClientConfig:上面提到过,封装了 Client 和 Server 交互配置信息的类。
  3. AbstractDiscoveryClientOptionalArgs:注入一些可选参数,和一些像 jersey1,jersey2等通用过滤器。
  4. BackupRegistry:备份注册中心的职责,当 Eureka Client 无法从任何一个Eureka Server 中获取注册表信息时,BackupRegistry将被调用以获取注册表信息。

从构造器代码往下看,忽略一些参数校验,属性赋值等之后,先来关注下面几行代码:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                Provider<BackupRegistry> backupRegistryProvider) {
    // ......
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();
    // 客户端配置 大部分信息采用DefaultEurekaClientConfig
    clientConfig = config;
    // 已经过时,主要为了兼容遗留客户端问题
    staticClientConfig = clientConfig;
    // 传输层如http请求超时、重试等信息
    transportConfig = config.getTransportConfig();
    // 该eureka实例的信息 如主机信息,健康检测接口等
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {
        logger.warn("Setting instanceInfo to a passed in null value");
    }
    // 备份注册表信息,当服务端不可用时,客户度可以从这里获取注册表信息
    this.backupRegistryProvider = backupRegistryProvider;
    // 如果eureka server的地址来源dns服务,则随机获取urls
    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    // 采用cas Applications存放的时服务器返回的存储客户端注册信息的
    localRegionApps.set(new Applications());
    // cas 递增版本,防止客户端注册旧的信息
    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    // 判断是否需要从eureka server 获取注册表信息 并初始化相应的度量信息
    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }
    // 是否需要将信息注册到eureka server上,通过这个开关可以实现,
    // 只获取其他实例的信息,而不将自己的信息给其他客户端发现
    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }
    // 如果不需要注册信息到server和拉取注册信息表,则初始化完成。
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        // ......
        return;  // no need to setup up an network tasks and we are done
    }
    // ......
}

上面两个 if 判断语句中,就对应着 Eureka 拉取注册列表和注册服务,如果两个都为 flase 得话,那 Discovery 初始化直接结束,表示该客户端既不进行服务注册也不进行服务发现。

config.shouldFetchRegistry()对应的配置:eureka client.fetch.register

  • 为 true 则表示将从 Eureka Server 中拉去注册表信息

config.shouldRegisterWithEureka()对应的配置:eureka.client.register.with-eureka

  • 为 true 则表示将注册到 Eureka Server 中

往下看,接着定义了一个线程池 ScheduledExecutorService ,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池。

scheduler = Executors.newScheduledThreadPool(2,
                                             new ThreadFactoryBuilder()
                                             .setNameFormat("DiscoveryClient-%d")
                                             .setDaemon(true)
                                             .build());
// 发送心跳线程池
heartbeatExecutor = new ThreadPoolExecutor(...);  // use direct handoff
// 缓存刷新线程池
cacheRefreshExecutor = new ThreadPoolExecutor(...);  // use direct handoff

往下看,初始化了 Eureka Client 和 Eureka Service 进行 HTTP 交互的 Jersey 客户端,将 AbstractDiscoveryClientOptionalArgs 中的属性用来构建 EurekaTransport。

eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);

往下看,接着从 Eureka Server 中拉取注册表信息。

if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
    fetchRegistryFromBackup();
}

如果 EurekaClientConfig # shouldFetchRegistry 为 ture 时,fetchRegistry 方法将被调用。

在 Eureka Client 向 Eureka Server 注册前,需要先从 Eureka Server 中拉取注册表中的信息,这是服务发现的前提。通过将 Eureka Server 中的注册表信息缓存到本地,就可以就近获取其它服务的相关信息,减少与 Eureka Server 的网络通信。

拉取完 Eureka Server 中的注册表信息后,将对服务实例进行注册,代码如下:

if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
    try {
        // 发起服务注册
        if (!register() ) {
            // 失败抛出异常
            throw new IllegalStateException("Registration error at startup. Invalid server response.");
        }
    } catch (Throwable th) {
        logger.error("Registration error at startup: {}", th.getMessage());
        throw new IllegalStateException(th);
    }
}

// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
// 最后初始化定时任务
initScheduledTasks();

在服务注册之前会进行注册预注册,Eureka 没有对此提供默认实现。构造函数的最后将初始化并启动发送心跳,刷新缓存和按需注册等定时任务。

看到这里,DiscoveryClient 的构造函数中重要的功能步骤都讲解了一下,那咱们来总结一下在这里面所做的所有功能:

  1. 相关属性的赋值,如:ApplicationInfoManager,EurekaClientConfig等。
  2. 备份注册中心的初始化,默认没有实现。
  3. 拉取 Eureka Server 注册表中的信息
  4. 注册前的预处理
  5. 向 Eureka Server 注册自身
  6. 初始化心跳定时任务,缓存刷新和按需注册等定时任务

来一个清晰的流程图:
在这里插入图片描述

整体的 Eureka Client 初始化的功能都分析完了,下面根据具体的功能进行分析,如拉取注册信息,续约,心跳,注册等。

3.5 拉取注册表信息

拉取注册表的入口在,DiscoveryClient类的 fetchRegistry 方法,相关代码如下:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        // 如果增量式拉取被禁止,或者Applications为null,进行全量拉取
        Applications applications = getApplications();

        if (clientConfig.shouldDisableDelta()
            || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
            || forceFullRegistryFetch
            || (applications == null)
            || (applications.getRegisteredApplications().size() == 0)
            || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            // 全量拉取注册表信息
            getAndStoreFullRegistry();
        } else {
            // 增量拉取注册表信息
            getAndUpdateDelta(applications);
        }
        // 计算应用集合一致性哈希码
        applications.setAppsHashCode(applications.getReconcileHashCode());
        // 打印注册表上所有服务实例的总数量
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    // 在更新远程实例状态之前推送缓存刷新事件,但是Eureka 中并没有提供默认的事件监听器
    onCacheRefreshed();

    // Update remote status based on refreshed data held in the cache
    // 基于缓存中被刷新的数据更新远程实例状态
    updateInstanceRemoteStatus();

    // registry was fetched successfully, so return true
    // 注册表拉取成功,返回true
    return true;
}

当 Eureka 客户端第一次注册到 Eureka 服务端时才会进行全量拉取,其他时候都只进行增量拉取,下面来看看这两个拉取注册表信息的具体逻辑。

  • DiscoveryClient # getAndStoreFullRegistry():全量拉取
  • DiscoveryClient # getAndUpdateDelta():增量拉取

3.5.1 全量拉取注册表信息

DiscoveryClient # getAndStoreFullRegistry()代码如下:

private void getAndStoreFullRegistry() throws Throwable {
    // 获取拉取的注册表版本,防止拉取版本落后(由其他的线程引起)
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    // 全量拉取注册表
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    // 获取成功
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
        // 检查currentUpdateGeneration的更新版本是否发生改变,无改变得华说明版本拉取是最新得
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        // 从apps中筛选出状态为 UP 得实例,同时打乱实例得顺序,方式同一个服务得不同实例在其同时接受流量
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

全量拉取的方式将会从 Eureka Server 中拉取注册表中所有得服务实例信息(封装在 Applications中),并经过处理后替换掉本地注册表缓存Applications。

通过 debug 启动方式启动,我们可以看到向 Eureka Server 全量拉取服务注册表信息的 url 为:http://localhost:9000/eureka/apps,前面“http://localhost:9000/eureka”是配置文件配置的注册中心地址,后面“/apps/”是程序自己拼接的。

具体代码:AbstractJerseyEurekaHttpClient # getApplicationsInternal,截图如下。

在这里插入图片描述

getAndStoreFullRegistry() 方法是有可能被多个线程同时调用的,这会导致拉取的注册表被旧的注册表所覆盖(有可能出现先拉取注册表信息的线程在覆盖 apps 时被阻塞,被后拉取注册表信息的线程抢先设置了 apps,被阻塞的线程恢复后在次设置了 apps,导致 apps 数据版本落后),产生脏数据,对此,Eureka 通过类型为 AtomicLong 的 currentUpdateGeneration 对 apps 的更新版本进行跟踪。如果跟新版本不一致,说明本次拉取注册表信息已过时,不需要缓存到本地。

最后对拉取到的注册表信息 apps 进行了筛选,只保留状态为 UP 的服务实例信息。

3.5.2 增量拉取注册表信息

DiscoveryClient # getAndUpdateDelta()代码如下:

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    // 增量拉取注册表
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }
	// 拉取失败,进行全量拉取
    if (delta == null) {
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
        // 全量拉取
        getAndStoreFullRegistry();
        // 检查currentUpdateGeneration的更新版本是否发生改变,无改变得华说明版本拉取是最新得
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 更新本地缓存
                updateDelta(delta);
                // 计算应用集合一致性哈希码
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // 比较应用集合一致性哈希码,如果不一致将认为本次增量式拉取数据已脏,将发起全量拉取更新本地缓存注册表信息
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

增量更新注册表信息,访问 Eureka Server 的 url 为:http://localhost:9000/eureka/apps/delta,前面“http://localhost:9000/eureka”是配置文件配置的注册中心地址,后面“/apps/delta”是程序自己拼接的。

具体代码:AbstractJerseyEurekaHttpClient # getApplicationsInternal,截图如下。

在这里插入图片描述

由于更新的过程时间比较久,时间成本为O(N^2),所以需要通过同步代码快防止多个线程同时进行更新,污染数据。

在根据从 Eureka Server 拉取的 delta 信息更新本地缓存的时候,Eureka 定义了 ActionType 来标记变更状态,代码在 InstanceInfo 类中,代码如下:

public enum ActionType {
    // 添加 Eureka Server
    ADDED, // Added in the discovery server
    // 在 Eureka Server 中的信息已经修改
    MODIFIED, // Changed in the discovery server
    // 被从 Eureka Server 中删除
    DELETED // Deleted from the discovery server
}

在更新本地缓存的时候,根据 InstanceInfo # ActionType 的不同,对 delta 中的 InstanceInfo 采取不同的操作,其中 ADDED 和 MODIFIED 状态变更的服务实例信息将添加到本地注册标中,DELETED 状态变更的服务实例将从本地注册表中删除,具体代码如下:

private void updateDelta(Applications delta) {

    // ......

    // 变更类型为 ADDED
    if (ActionType.ADDED.equals(instance.getActionType())) {
        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
        if (existingApp == null) {
            applications.addApplication(app);
        }
        logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
        // 添加到本地注册表
        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
        // 变更类型为 MODIFIED
    } else if (ActionType.MODIFIED.equals(instance.getActionType())) {
        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
        if (existingApp == null) {
            applications.addApplication(app);
        }
        logger.debug("Modified instance {} to the existing apps ", instance.getId());
        // 添加到本地注册表
        applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
        // 变更类型为 DELETED
    } else if (ActionType.DELETED.equals(instance.getActionType())) {
        Application existingApp = applications.getRegisteredApplications(instance.getAppName());
        if (existingApp == null) {
            applications.addApplication(app);
        }
        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
        // 从本地注册表中移除
        applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
    }

    // ......

}

在更新完本地缓存的注册表中之后,Eureka Client 会通过 DiscoveryClient # getReconcileHashCode(Applications applications)方法计算合并后的 Applications 的 appHashCode (应用集合一直性哈希码),和 Eureka Server 传递的 delta 上的 appsHashCode 进行比较(delta 中携带的 appsHashCode 通过 Eureka Server 的全量注册表计算得出),比对客户端和服务端上注册表的差异。如果哈希值不一致的话将再次调用一次 getAndStoreFullRegistry 获取全量数据保证 Eureka Client 与 Eureka Server 之间注册表数据的一致。

private void getAndUpdateDelta(Applications applications) throws Throwable {
    
    // ......
    
    // There is a diff in number of instances for some reason
    if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
        reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
    }

    // ......
    
}

在 reconcileAndLogDifference() 方法中将会执行全量拉取注册表信息操作

appsHashCode 的一般表示为:

appsHashCode = ${status}_ ${count} _

它通过将应用状态和数量拼接成字符串,表示了当前注册表中服务实例状态的统计信息。比如:有 10 个应用实例的状态为 UP ,有 5 个应用实例状态为 DOWN ,其中它的状态数量为 0 (不进行表示),那么 appsHashCode 的形式如下是:

appsHashCode = UP_ 10 _ DOWN _ 5 _

3.4.3 拉取注册表流程图

在这里插入图片描述

3.6 服务注册

在拉取完 Eureka Server 中的注册表信息并将其缓存在本地后,Eureka Client 将向 Eureka Server 注册自身服务实例元数据,主要逻辑在 DiscoveryClient # register 方法中。代码如下:

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        // 发起注册
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
    }
    // 注册成功
    return httpResponse.getStatusCode() == 204;
}

在注册的时候,Eureka CLient 会将自身信息封装成实例元数据(InstanceInfo中)发送到 Eureka Server 中请求服务注册,当 Eureka Server 返回 204 状态码,表示注册成功。

相关的注册 url 在追踪到 AbstractJerseyEurekaHttpClient # register即可看出为,http://localhost:9000/eureka/apps/SERVER-USER,其中http://localhost:9000/eureka/是配置配置文件指定的注册中心地址,/apps/${app_name}则是具体的服务注册,参数为 InstanceInfo 实例名称。

debug 查看注册url,如下图:

在这里插入图片描述

3.7 初始化定时任务

在 Eureka Client 应用中,服务的注册是一个持续的过程,所以 Eureka Client 会通过定时发送心跳的方式于 Eureka Server 进行通信,维持自己在 Server 注册表上的续租。

同时,Eureka Server 注册表中的服务实例信息是动态变化的,为了保持 Eureka Client 与 Eureka Server 的注册表信息一致性,Eureka Client 会定时向 Eureka Server 拉取服务注册表信息并更新本地缓存。

并且 Eureka Cliten 为了监控自身应用信息和状态的变化,Eureka Client 设置了一个按需注册的定时器,定时检查自身应用信息活者状态变化,并在发生变化时向 Eureka Server 重新注册,避免注册表中的本服务实例信息不可用。

DiscoveryClient # initScheduledTasks 方法中初始化了三个定时器任务:

  • 一个用于向 Eureka Server 拉取注册表信息刷新本地缓存
  • 一个用于向 Eureka Server 发送心跳
  • 一个用于进行按需注册操作

相关代码如下:

private void initScheduledTasks() {
    if (clientConfig.shouldFetchRegistry()) {
        // 注册表缓存刷新定时器
        // 获取配置文件中刷新间隔,默认为 30s,可以通过 eureka.client.registry-fetch-interval-seconds进行设置
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        scheduler.schedule(
            new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
            ),
            registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        // 发送心跳定时器,默认30秒发送一次心跳
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // 心跳定时器
        scheduler.schedule(
            new TimedSupervisorTask(
                "heartbeat",
                scheduler,
                heartbeatExecutor,
                renewalIntervalInSecs,
                TimeUnit.SECONDS,
                expBackOffBound,
                new HeartbeatThread()
            ),
            renewalIntervalInSecs, TimeUnit.SECONDS);

        // 下面是按需注册定时器相关逻辑
        
        // ......
        
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

3.7.1 缓存刷新定时任务与发送心跳定时任务

DiscoveryClient # initScheduledTasks 方法中,通过 ScheduledExecutorService # schedule 的方式提交缓存刷新任务和发送心跳任务,任务执行的方式为延时执行并且不循环,这两个任务的定时循环逻辑由 TimedSupervisorTask 提供实现。

TimedSupervisorTask 继承了 TimeTask ,提供执行定时任务的功能。它在 run 方法中定义执行定时任务的逻辑,具体代码如下:

public class TimedSupervisorTask extends TimerTask {

    // ......

    @Override
    public void run() {
        Future<?> future = null;
        try {
            // 执行任务
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            // 等待任务执行结果
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            // 执行完成,设置下次任务执行频率(时间间隔)
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            // 执行任务超时
            timeoutCounter.increment();
			// 设置下次任务执行频率(时间间隔)
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
			// 执行任务被拒绝,统计被拒绝次数
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }
			// 其他异常,统计异常次数
            throwableCounter.increment();
        } finally {
            // 取消非结束任务
            if (future != null) {
                future.cancel(true);
            }
			// 如果定时任务未关闭,定义下一次任务
            if (!scheduler.isShutdown()) {
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }

    // ......

}

rum方法中主要实现功能就是定时执行一个任务调度,其过程如下:

  1. schedule 初始化并延迟执行 TimedSupervisorTask
  2. TimedSupervisorTask 将 task 提交 executor 中执行,task 和 executor 在初始化 TimedSupervisorTask 时传入。
  3. 当 task 正常执行,TimedSupervisorTask 将自己提交到 schedule ,延迟 delay 时间后再此执行。
  4. 当 task 执行超时,计算新的 delay 时间 (不超过 maxDelay ),TimedSupervisorTask将自己提交到 schedule,延迟 delay 时间后再次执行。

其执行流程图:

在这里插入图片描述

TimedSupervisorTask通过这种不断循环提交任务的方式,完成定时执行任务的功能。

DiscoveryClient # initScheduledTasks 方法中,提交缓存刷新定时任务的线程任务为 CacheRefreshThread,提交发送心跳定时任务的线程为 HeartThread

CacheRefreshThread继承了 Runnable 接口,代码如下:

class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
    @VisibleForTesting
    void refreshRegistry() {
        try {
            // .....
            // 判断远程Region是否改变(即 Eureka Server 地址是否发生改变),决定进行全量拉去还是增量拉取
            boolean success = fetchRegistry(remoteRegionsModified);
            // .....
            // 打印更新注册表缓存后的变化
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }
    }
}

CacheRefreshThread 线程任务将委托 DiscoveryClient # fetchRegistry 方法进行缓存化系的具体操作。

HeartThread同样继承了 Runnable 接口,该任务的作用是向 Eureka Server 发送心跳请求,维持 Eureka Client 在注册表中的续约,代码如下:

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

可看出,其主要的逻辑代码在 renew() 方法中,代码如下:

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        // 调用 HTTP 发送心跳到 Eureka Server 中维持租续
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        // Eureka Server 中不存在该应用实例
        if (httpResponse.getStatusCode() == 404) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            // 重新注册
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        // 续约成功
        return httpResponse.getStatusCode() == 200;
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

Eureka Server 会根据续租提交的 appName 与 instanceInfoId 来更新注册表中的服务实例的续租。当注册表中不存在该服务实例时,将返回 404 状态码,发送心跳请求的 Eureka Client 在接受到 404 状态码后将会重新发起注册,如果续约成功,将会返回 200 状态码。

通过 debug 我们可以在 AbstractJerseyEurekaHttpClient # sendHeartBeat 方法中,可以发现服务续租调用的接口即传递的参数,如图:

在这里插入图片描述

续租的 url :http://localhost:9000/eureka/apps/ A P P N A M E / {APP_NAME}/ APPN?AME/{INSTANCE_INFO_ID},HTTP方法为put,参数主要有 status(当前服务状态),lastDirtyTimestamp(上次数据变化时间)以及 overriddenStatus。

3.7.2 按需注册定时任务

按需注册定时任务的作用是当 Eureka Client 中的 InstanceInfo 或者 status 发生变化时,重新向 Eureka Server 发起注册请求,更新注册表中的服务实例信息,保证 Eureka Server 注册表中服务实例有效和可用。按需注册代码如下:

// InstanceInfo replicator
// 定时检查刷新服务实例信息,检查是或否有变化,是否需要重新注册
instanceInfoReplicator = new InstanceInfoReplicator(
    this,
    instanceInfo,
    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
    2); // burstSize
// 监控应用的 status 变化,发生变化即可发起重新注册
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
        return "statusChangeListener";
    }

    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
            // log at warn level if DOWN was involved
            logger.warn("Saw local status change event {}", statusChangeEvent);
        } else {
            logger.info("Saw local status change event {}", statusChangeEvent);
        }
        instanceInfoReplicator.onDemandUpdate();
    }
};

if (clientConfig.shouldOnDemandUpdateStatusChange()) {
    // 注册应用状态改变监听器
    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 启动定时按需注册定时任务
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

按需注册功能分为两部分:

  1. 一部分定义了一个定时任务,定时刷新服务实例的信息和检查应用状态的变化,在服务实例信息发生改变的情况下向 Eureka Server 重新发起注册操作。
  2. 一部分时注册状态改变监控器,在应用状态改变的情况时,刷新服务实例信息,在服务实例信息发生改变的情况下 Eureka Server 重新发起注册操作。

instanceInfoReplicator 中的定时任务逻辑位于 #run 方法中,如下所示:

public void run() {
    try {
        // 刷新了 InstanceInfo中的服务实例信息
        discoveryClient.refreshInstanceInfo();
		// 如果数据发生改变,则返回数据更新时间
        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            // 注册服务实例
            discoveryClient.register();
            // 重置更新状态
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        // 执行下一个延时任务
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

其中 DiscoveryClient 中刷新本地服务实例信息和检查服务状态变化的代码如下(discoveryClient.refreshInstanceInfo):

void refreshInstanceInfo() {
    // 刷新服务实例信息
    applicationInfoManager.refreshDataCenterInfoIfRequired();
    // 更新租续的信息
    applicationInfoManager.refreshLeaseInfoIfRequired();

    InstanceStatus status;
    try {
        // 调用 HealthCheckHandler 检查服务实例的状态变化
        status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
    } catch (Exception e) {
        logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
        status = InstanceStatus.DOWN;
    }

    if (null != status) {
        applicationInfoManager.setInstanceStatus(status);
    }
}

instanceInfoReplicator # run 方法中,首先调用了 discoveryClient.refreshInstanceInfo 方法刷新当前的服务实例信息,查看当前服务实例信息和服务状态是否发生变化,如果当前服务实例信息或者服务状态发生变化将向 Eureka Server 重新发起服务注册操作。

最后再此声明了一下延时任务,用于再测调用 run 方法,继续检查服务实例信息和服务状态的变化,在服务实例信息发生变化的情况下重新发起注册。

如果 Eureka Client 的状态发生变化(在 SpringBoot 通过 Actuator 对服务状态进行监控,具体实现为 EurekaHealthCheckHandler),注册在 ApplicationInfoManager 的状态改变监控器将会被触发,从而调用 InstanceInfoReplicator # onDemandUpdate 方法,检查服务实例信息和服务状态的变化,可能会引发按需注册任务。代码如下:

public boolean onDemandUpdate() {
    // 控制流量,当超过限制时,不能进行按需更新
    if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
        if (!scheduler.isShutdown()) {
            scheduler.submit(new Runnable() {
                @Override
                public void run() {
                    logger.debug("Executing on-demand update of local InstanceInfo");

                    Future latestPeriodic = scheduledPeriodicRef.get();
                    // 取消上次 run 任务
                    if (latestPeriodic != null && !latestPeriodic.isDone()) {
                        logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                        latestPeriodic.cancel(false);
                    }
                    // 重新启动 run 方法
                    InstanceInfoReplicator.this.run();
                }
            });
            return true;
        } else {
            logger.warn("Ignoring onDemand update due to stopped scheduler");
            return false;
        }
    } else {
        logger.warn("Ignoring onDemand update due to rate limiter");
        return false;
    }
}

InstanceInfoReplicator # onDemandUpdate 方法调用 InstanceInfoReplicator # run 方法检查服务实例信息和服务状态的变化,并在服务实例信息发生变化的情况下向 Eureka Server 发起重新注册的请求。

为了防止重新重复执行 run 方法, onDemandUpdate 方法还会取消执行上次已提交且为未完成的 run 方法,执行最新的按需注册任务。

按需注册定时任务的处理流程如图:

在这里插入图片描述

3.8 服务下线

服务下线就是 Eureka Client 向 Eureka Server 注销自身在注册表中的信息,DiscoveryClient 中对象在销毁前执行的清理方法如下:

@PreDestroy
@Override
public synchronized void shutdown() {
    // 同步方法
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");
        // 原子操作,确保只执行一次
        if (statusChangeListener != null && applicationInfoManager != null) {
            // 注销状态监听器
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
        // 取消定时任务
        cancelScheduledTasks();

        // If APPINFO was registered
        if (applicationInfoManager != null
            && clientConfig.shouldRegisterWithEureka()
            && clientConfig.shouldUnregisterOnShutdown()) {
            // 服务下线
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
        // 关闭 Jersy 客户端
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
        // 关闭相关 Monitor
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        logger.info("Completed shut down of DiscoveryClient");
    }
}

在销毁 DiscoveryClient 之前,会进行一系列的清理工作,包括 ApplicationInfoManager 中的 StatusChangeListener、取消定时任务、服务下线和关闭 Jersey 客户端等。

下面我们主要看 unregister 服务下线方法,代码如下:

void unregister() {
    // It can be null if shouldRegisterWithEureka == false
    if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
        try {
            logger.info("Unregistering ...");
            EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
            logger.info(PREFIX + "{} - deregister  status: {}", appPathIdentifier, httpResponse.getStatusCode());
        } catch (Exception e) {
            logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
        }
    }
}

其中调用了 AbstractJerseyEurekaHttpClient # cancel 方法中,可以发现服务下线调用的接口以及传递的参数,代码如下:

@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
    String urlPath = "apps/" + appName + '/' + id;
    ClientResponse response = null;
    try {
        Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
        addExtraHeaders(resourceBuilder);
        response = resourceBuilder.delete(ClientResponse.class);
        return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
    } finally {
        if (logger.isDebugEnabled()) {
            logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
        }
        if (response != null) {
            response.close();
        }
    }
}

服务下线的接口地址为 apps/ A P P N A M E / {APP_NAME}/ APPN?AME/{INSTANCE_INFO_ID},参数为服务名称和服务实例id,HTTP方法为delete。

3.9 Eureka Client源码执行流程总图

在这里插入图片描述

四、Eureka Server 源码解析

Eureak Server 作为一个开箱即用的服务注册中心,提供了以下功能,用以满足与 Eureka Client 交互的需求:

  1. 服务注册
  2. 接受服务心跳
  3. 服务剔除
  4. 服务下线
  5. 集群同步
  6. 获取注册表中服务实例信息

同时 Eureka Server 它也是一个 Eureka Client,所以在不禁用 Eureka Server 的客户端行为时,它是会向它配置文件中的其他 Eureka Server 进行拉取注册表、服务注册和发送心跳等操作。

下面先看注册表的类关系 InstanceRegistry ,为后面的服务注册Eureka-Server 集群复制做整体的铺垫。

4.1 服务实例注册表

InstanceRegistry 是 Eureka Server 中注册表管理的核心接口,类结构图如下:

在这里插入图片描述

  • InstanceRegistry类,对 Eureka Server 的注册表实现类 PeerAwareInstanceRegistryImpl 进行了继承和扩展,使其适配 Spring Cloud 的使用环境,主要实现由 PeerAwareInstanceRegistryImpl 提供。
  • InstanceRegistry接口,是 Eureka Server 注册表的最核心接口,其职责是在内存中管理注册到 Eureka Server 中的服务实例信息。
  • LeaseManager接口,是对注册到 Eureka Server 中的服务实例租续进行管理。
  • LookupService接口,是提供对服务实例进行检索,发现活跃的服务实例功能(Eureka Client源码介绍中讲过)。

从上而下,LeaseManager 接口提供的方法代码如下:

public interface LeaseManager<T> {

    // 注册,创建新的租约
    void register(T r, int leaseDuration, boolean isReplication);

    // 下线,取消指定服务的租约
    boolean cancel(String appName, String id, boolean isReplication);

    // 续约,更新指定服务的租约
    boolean renew(String appName, String id, boolean isReplication);

    // 剔除,剔除服务实例
    void evict();
}

LeaseManager 接口的作用是对注册到 Eureka Server 中的服务实例续约进行管理,分别有服务注册、服务下线、服务租续更新以及服务剔除等操作。

LeaseManager 中管理的对象是 Lease ,Lease 代表一个 Eureka Clietn 服务实例信息的租续,它提供了对其内持有的类的事件有效性操作。Lease 持有的类是代表服务实例信息的 InstanceInfo。Lease中定义了租约的操作,分别是注册、下线、更新,同时提供了对租约中事件属性的各项操作。租约默认的有效时长(duration)为 90 秒

InstanceRegistry接口 在继承 LeaseManagerLookupService 接口的基础上,还添加了一些特有的方法,可以更为简单地管理服务实例租约和查询注册表中的服务实例信息。可以通过 AbstractInstanceRegistry 查看 InstanceRegistry 接口方法的具体实现。

PeerAwareInstanceRegistry 继承了 InstanceRegistry 接口,在其基础上添加了 Eureka Server 集群同步的操作,其实现类 PeerAwareInstanceRegistryImpl 继承了 AbstractInstanceRegistry 的实现,在对本地注册表操作的基础上添加了对其 peer 节点的同步复制操作,使得 Eureka Server 集群中的注册表信息保持一致。

4.2 服务注册

Eureka Client 在发起服务注册时会将自身服务实例元数据封装在 InstanceInfo 中,然后将 InstanceInfo 发送到 Eureka Server。Eureka Server 在接收到 Eureka Client 发送的 InstanceInfo 后将会尝试将其放到本地注册表中以供其他 Eureka Client 进行服务发现。

服务注册的主要实现位于 AbstractInstanceRegistry # registry 方法中,代码如下:

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 获取读锁
        read.lock();
        // 这里的 registry 是 ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 类型,根据 appName 对服务实例集群进行分类
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        if (gMap == null) {
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            // putIfAbsent(key,value):key和value进行了关联则返回 value,否则将 key和value进行关联,返回null
            // 这里有一个比较严谨的操作,防止再添加新得服务实例集群租续时把已有其他线程添加的集群租续覆盖掉,如果存在该键值,直接返回已存在的值;否者添加改键值对,返回null
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                gMap = gNewMap;
            }
        }
        // 根据 instanceId 获取实例租续
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // Retain the last dirty timestamp without overwriting it, if there is already a lease
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

            // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
            // InstanceInfo instead of the server local copy.
            // 如果该实例的租续已经存在,比较最后更新时间戳大小,取最大值的注册信息为有效
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                registrant = existingLease.getHolder();
            }
        } else {
            // 租续不存在,这是一个新的注册实例
            synchronized (lock) {
                if (this.expectedNumberOfRenewsPerMin > 0) {
                    // 自我保护机制
                    // Since the client wants to cancel it, reduce the threshold
                    // (1
                    // for 30 seconds, 2 for a minute)
                    this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                    this.numberOfRenewsPerMinThreshold =
                        (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        // 创建新的租续
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 如果租续存在,继承租续的服务上线初始值
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 保存租续
        gMap.put(registrant.getId(), lease);
        // 用来统计最近注册服务实例的数据
        synchronized (recentRegisteredQueue) {
            // 添加最近注册队列
            recentRegisteredQueue.add(new Pair<Long, String>(
                System.currentTimeMillis(),
                registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // This is where the initial state transfer of overridden status happens
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                         + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        // 根据服务状态规则得到服务hi里的最终状态,并设置服务实例的当前状态
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // If the lease is registered with UP status, set lease service up timestamp
        // 如果实例状态为 UP,设置租约的服务上线时间,只有第一次设置有效
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        registrant.setActionType(ActionType.ADDED);
        // 添加最近租约变更记录队列,表示 ActionType 为 ADDED,这将用于 Eureka Client 增量式获取注册表信息
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        // 设置服务实例信息更新时间
        registrant.setLastUpdatedTimestamp();
        // 设置 response 缓存过期,这将用于 Eurkea Client 全量获取注册表信息
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        // 释放锁
        read.unlock();
    }
}

registry 的结构:

registry:ConcurrentHashMap<appName, Map<instanceId, Lease<InstanceInfo>>>

在 registry 中,服务实例的 InstanceInfo 保存在 Lease 中,Lease 在 AbstractInstanceRegistry 中统一通过 ConcurrentHashMap 保存在内存中。

在服务注册过程中,会先获取一个读锁,防止其他线程对 registry 注册表进行数据操作,避免数据的不一致。然后从 resgitry 查询对应的 InstanceInfo 租续是否已经存在注册表中,根据 appName 划分服务集群,使用 InstanceId 唯一标记服务实例。如果租约存在,比较两个租约中的 InstanceInfo 的最后更新时间 lastDirtyTimestamp,保留时间戳最大的服务实例信息 InstanceInfo。如果租约不存在,意味着这是一个全新的服务注册,将会进行自我保护的统计,创建新的租约保存 InstanceInfo。接着将租约放到 resgitry 注册表中。

之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态,缓存操作包括将 InstanceInfo 加入用于统计 Eureka Client 增量式获取注册表信息得 recentlyChangedQueue 和失败 responseCache 中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁完成服务注册。

在服务注册中,registry 方法为了防止数据被错误的覆盖而进行了大量的同步操作(读写锁,synchronized 锁)。

4.3 接受服务心跳

我们知道,Eureka Client 为了向服务端证明自己是有用的,会定时向 Eureka Server 发送一个心跳(默认30秒一次),以此来证明自己可用不被 Eureka Server 从注册表中剔除掉。

在 Eureka Server 中处理心跳请求的核心逻辑位于 AbstractInstanceRegistry # renew

renew 方法是对 Eureka Client 位于注册表中的租约的续租操作,不像 register 方法需要服务实例信息,进根据服务实例的服务名和服务实例 id 即可更新对应租约的有效时间,源码如下:

public boolean renew(String appName, String id, boolean isReplication) {
    RENEW.increment(isReplication);
    // 根据appName获取服务集群的租约集合
    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
    Lease<InstanceInfo> leaseToRenew = null;
    if (gMap != null) {
        // 从集合中获取租约
        leaseToRenew = gMap.get(id);
    }
    if (leaseToRenew == null) {
        // 租约不存在 直接返回 false
        RENEW_NOT_FOUND.increment(isReplication);
        logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
        return false;
    } else {
        InstanceInfo instanceInfo = leaseToRenew.getHolder();
        if (instanceInfo != null) {
            // touchASGCache(instanceInfo.getASGName());
            // 根据覆盖状态规则得到服务实例的最终状态
            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                    instanceInfo, leaseToRenew, isReplication);
            if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                // 如果得到的服务实例最后状态是 UNKNOWN 取消续约
                logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                        + "; re-register required", instanceInfo.getId());
                RENEW_NOT_FOUND.increment(isReplication);
                return false;
            }
            // instanceInfo 中的状态改变,更新
            if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                logger.info(
                        "The instance status {} is different from overridden instance status {} for instance {}. "
                                + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                instanceInfo.getOverriddenStatus().name(),
                                instanceInfo.getId());
                // 更新服务状态
                instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

            }
        }
        // 统计每分钟续租的次数 用于自我保护
        renewsLastMin.increment();
        // 更新租约中的有效时间
        leaseToRenew.renew();
        return true;
    }
}

在 renew 方法中,不关注 InstanceInfo ,仅关注于租约本身以及租约的服务实例状态。如果根据服务实例的 appName 和 instanceInfoId 查询出服务实例的租约,并且根据 getOverriddenInstanceStatus 方法等到的 instanceStatus 不为 InstanceStatus.UNKNOWN 那么更新租约中的有效时间,即更新租约 Lease 中 lastUpdateTimestamp ,达到续约的目的;如果租约不存在,那么返回续租失败。

4.4 服务剔除

如果 Eureka Client 在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因),那么服务的状态就处于不可知的状态,不能保证能够从该服务中获取到反馈,所以需要服务剔除 AbstractInstanceRegistry # evict 方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期续租剔除。源代码如下:

@Override
public void evict() {
    evict(0l);
}

public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        // 自我保护相关,出现该状态,不允许剔除服务
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // 遍历注册表register 一次性获取所有的过期租约
    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    // 计算最大允许剔除的租约的数量 获取注册表租约总数
    int registrySize = (int) getLocalRegistrySize();
    // 计算注册表租约的阀值 与自我保护相关
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;
    // 计算剔除租约的数量
    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        // 逐个随机剔除
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            // 逐个剔除
            internalCancel(appName, id, false);
        }
    }
}

服务剔除将会遍历 registry 注册表,找出其中所有的过期租约,然后根据配置文件中续租百分比阈值和当前注册表的租约总数量计算出最大允许的剔除租约的数量(当前注册表中租约总数量减去当前注册表租约阈值),分批次剔除过期的服务实例租约。

对过期的服务实例租约调用 AbstractInstanceRegistry # internalCancel 服务下线的方法将其从注册表中清除掉。

为了保证 Eureka Server 的可用性,在服务剔除 evict 方法中有很多限制:

  1. 自我保护时期不能进行服务剔除操作。
  2. 过期操作是分批进行。
  3. 服务剔除是随机逐个剔除,剔除均匀分布在所有应用中,防止在同一时间内同意服务集群中的服务全部过期被剔除,以致大量剔除发生,在未进行自我保护前提促使程序崩溃。

服务剔除是一个定时的任务,所以 AbstractInstanceRegistry 中定义了一个 EvictionTask 用于定时执行服务剔除,默认 60 秒一次。

服务剔除的定时任务一般在 AbstractInstanceRegistry 初始化结束后进行,按照执行频率 evictionIntervalTimerInMs 的设定,定时剔除过期的服务实例租约。

自我保护机制主要在 Eureka Client 和 Eureka Server 之间存在网络分区的情况下发挥保护作用,在服务器端和客户端都有对应的实现。

假设在某种特定的情况下(网络故障…), Eureka Client 和 Eureka Server 无法进行通信,此使 Eureka Client 无法向 Eureka Server 发起祖册和续约请求,Eureka Server 中就可能因注册表中的服务实例租约出现大量过期而面临被剔除的危险,然而此使的 Eureka Client 可能是处于健康状态的(可接受服务访问),如果直接将注册表中大量过期的服务实例租约剔除掉先然是不合理的。

针对上面那种情况,Eureka 设计了”自我保护机制“。在 Eureka Server 处,如果出现大量服务实例过期被剔除的现象,那么改 Server 节点将进入自我保护模式,保护注册表中的信息不再被剔除,在通信稳定后在退出该模式;在Eureka Client 处,如果向 Eureka Server 注册失败,将快速超时并尝试于其他的 Eureka Server 进行通信。”自我保护机制“的设计大大提高了 Eureka 的可用性

4.5 服务下线

Eureka Client 在应用销毁时,会向 Eureka Server 发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。在服务剔除的过程中,也是通过服务下线的逻辑完成对单个服务实例过期租约的清楚工作。

服务下线的主要实现代码位于 AbstractInstanceRegistry # internalCancel 方法中,仅需要服务实例的服务名和服务实例的 id 即可完成服务下线。源代码如下:

protected boolean internalCancel(String appName, String id, boolean isReplication) {
    try {
        // 获取读锁,防止被其他线程进行修改
        read.lock();
        CANCEL.increment(isReplication);
        // 根据appName获取服务实例的集群
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToCancel = null;
        if (gMap != null) {
            // 移除服务实例的租约
            leaseToCancel = gMap.remove(id);
        }
        // 将服务实例信息添加到最近下线服务实例统计队列
        synchronized (recentCanceledQueue) {
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
        }
        // 移除实例的状态信息
        InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
        if (instanceStatus != null) {
            logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
        }
        // 租约不存在,返回 false
        if (leaseToCancel == null) {
            CANCEL_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
            return false;
        } else {
            // 设置租约的下线时间
            leaseToCancel.cancel();
            InstanceInfo instanceInfo = leaseToCancel.getHolder();
            String vip = null;
            String svip = null;
            if (instanceInfo != null) {
                instanceInfo.setActionType(ActionType.DELETED);
                // 添加最近租约变更记录队列,标识ActionType DELETED
			    // 这将用于Eureka Client 增量式获取注册表信息
                recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                instanceInfo.setLastUpdatedTimestamp();
                vip = instanceInfo.getVIPAddress();
                svip = instanceInfo.getSecureVipAddress();
            }
            // 设置 response 缓存过期
            invalidateCache(appName, vip, svip);
            logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            // 下线成功
            return true;
        }
    } finally {
        // 释放锁
        read.unlock();
    }
}

internalCancel 方法与 register 方法的行为过程很类似,首先通过 register 根据服务名和服务实例 id 查询关于服务实例的租约 Lease 是否存在,统计最近下线的服务实例用于 Eureka Server 主页展示。如果租约不存在,返回下线失败;如果租约存在,从 register 注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,以用于 Eureka Client 的增量式获取注册表信息,最后设置 reposonse 缓存过期。

internalCancel 方法中同样通过读锁保证 register 注册表中数据的一致性,避免脏读。

4.6 集群同步

如果,Eurkea Server 是通过集群的方式进行部署,那么为了维护整个集群中 Eureka Server 注册表数据的一致性,势必需要一个机制同步 Eureka Server 集群中注册表的数据。

Eureka Server 集群同步包含两个部分:

  1. 一部分是 Eureka Server 在启动过程中从它的 peer 节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中。
  2. 另一部分是 Eureka Server 每次对本地注册表进行操作时,同时会将操作同步到它的 peer 节点中,达到集群注册表数据统一的目的。

4.6.1 Eureka Server 初始化本地注册表信息

在 Eureka Server 启动的过程中,会从它的 peer 节点中拉取注册表来初始化本地注册表,这部分主要通过 PeerAwareInstanceRegistry # syncUp 方法完成,他将从可能存在 peer 节点中,拉取 peer 节点中的注册表信息,并将其中的服务实例信息注册到本地注册表中,源代码如下:

public int syncUp() {
    // Copy entire entry from neighboring DS node
    // 从临近的 peer 中复制整个注册表
    int count = 0;
    // 如果获取不到,线程等待
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) {
                logger.warn("Interrupted during registry transfer..");
                break;
            }
        }
        // 获取所有的服务实例
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    // 判断是否可以注册,主要用于 AWS 环境下进行,若部署在其他环境,直接返回ture
                    if (isRegisterable(instance)) {
                        // 注册到自身的注册表中
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {
                    logger.error("During DS init copy", t);
                }
            }
        }
    }
    return count;
}

Eureka Server 也是一个 Eureka Client ,在启动的时候也会进行 DiscoveryClient 的初始化,会从其对应的 Eureka Server 中拉取全量的注册表信息。

在 Eureka Server 集群部署的情况下, Eureka Server 从它的 peer 节点中拉取到注册表信息后,将遍历这个 Applications,将所有的服务实例通过 AbstractRegistry # register 方法啊注册到自身注册表中。

在初始化本地注册表时,Eureka Server 并不会接受来自 Eureka Client 的通信请求(如注册、或者获取注册表信息等请求)。 在同步注册表信息结束后会通过 PeerAwareInstanceRegistryImpl # openForTraffic 方法允许该 Server 接受流量。源代码如下:

public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 初始化自我保护机制统计参数
    this.expectedNumberOfRenewsPerMin = count * 2;
    this.numberOfRenewsPerMinThreshold =
            (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    // 如果同步的应用实例数量为 0 ,将在一段时间内拒绝 Client 获取注册信息
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    // 判断是否 AWS 预序环境,可忽略该部分
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 修改服务实例的状态为健康上线,可以接受流量
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    super.postInit();
}

在 Eureka Server 中有一个 StatusFilter 过滤器,用于检查 Eureka Server 的状态,当 Server 的状态不为 UP 时,将拒绝所有的请求。

在 Client 请求获取注册表信息时,Server 会判断此时是否允许获取注册表中的信息。这种做法是为了避免 Eureka Server 在 syncUp 方法中没有获取到任何服务实例信息时(Eureka Server 集群部署的情况下),Eureka Server 注册表中的信息影响到 Eureka Client 缓存的注册表中的信息。

如果 Eureka Server 在 syncUp 方法中没有获取任何的服务实例信息,它将把 peerInstancesTransferEmptyOnStartup 设置为 true,这时该 Eureka Server 在 WaitTimeInMsWhenSyncEmpty(可以通过 eureka.server.wait-time-in-ms-when-sync-empty 设置,默认是 5 分钟)时间后才能被 Eureka Client 访问获取注册表信息。

4.6.2 Eureka Server 之间注册表信息的同步复制

为了保证 Eureka Server 集群运行时注册表信息的一致性,每个 Eureka Server 在对本地注册表进行管理操作时,会将相应的操作同步到所有 peer 节点中。

在 PeerAwareInstanceRegistryImpl 中,对 Abstractinstanceregistry 中的 register、cancel 和 renew 等方法都添加了同步到 peer 节点的操作,使 Server 集群中注册信息保持最终一致性,部分源代码如下:

下线

public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    // ...
    // 同步下线状态
    replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
    // ...
}

注册

public void register(final InstanceInfo info, final boolean isReplication) {
    // ...
    // 同步注册状态
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

续约

public boolean renew(final String appName, final String id, final boolean isReplication) {
    // ...
    // 同步续约状态
    replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
    // ...
}

同步的主要操作有(枚举类):

public enum Action {
    Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
    // ...
}

每个同步方法都是调用如下方法:

private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        // 如果 peer 集群为空,或者本来就是复制操作,那么久不在复制,防止造成循环复制
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }
        // 向 peer 集群中的每一个peer 进行同步
        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            // 如果 peer 系欸但时自身的话,不进行同步复制
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            // 根据 Action 调用不同的同步请求
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}

peerEurekaNode 代表一个可同步共享数据的 Eureka Server。在 PeerEurekaNode中,具有 register、cancel、heartbeat 和 statueUpdate 等诸多用于向 peer 节点同步注册表信息的操作。

在 replicateInstanceActionsToPeers 方法中根据 action 的不同,调用 PeerEurekaNode 的不同方法进行同步复制,代码如下所示:

private void replicateInstanceActionsToPeers(Action action, String appName,
                                             String id, InstanceInfo info, InstanceStatus newStatus,
                                             PeerEurekaNode node) {
    try {
        InstanceInfo infoFromRegistry = null;
        CurrentRequestVersion.set(Version.V2);
        switch (action) {
            case Cancel:
                // 同步下线
                node.cancel(appName, id);
                break;
            case Heartbeat:
                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 同步心跳
                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                break;
            case Register:
                // 同步注册
                node.register(info);
                break;
            case StatusUpdate:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 同步跟新状态
                node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                break;
            case DeleteStatusOverride:
                infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                // 删除实例覆盖状态
                node.deleteStatusOverride(appName, id, infoFromRegistry);
                break;
        }
    } catch (Throwable t) {
        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
    }
}

PeerEurekaNode 中的每一个同步复制都是通过比任务流的方式进行操作,同一时间段内相同服务实例的相同操作将使用相同的任务编号,在进行同步复制的时候根据任务编号合并操作,减少同步操作的数量和网络消耗,但是同步也造成同步复制的延时性,不满足 CAP 中的 C(强一致性)。

通过 Eureka Server 在启动过程中初始化本地注册表信息和 Eureka Server 集群间的同步复制操作,最终达到了集群中 Eureka Server 注册表信息一致的目的。

4.7 获取注册表中服务实例信息

Eureka Server 中获取注册表的服务实例信息主要通过两个方法实现:

  1. AbstractInstanceRegistry # getApplicationsFromMultipleRegions 从多地区获取全量注册表数据
  2. AbstractInstanceRegistry # getApplicationDeltasFromMultipleRegions 从多地区获取增量式注册表数据

4.7.1 getApplicationsFromMultipleRegions

getApplicationsFromMultipleRegions 方法将会从多个地区中获取全量注册表信息,并封装成 Application 返回,源代码如下:

public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {

    boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;

    logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
                 includeRemoteRegion, remoteRegions);

    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
    } else {
        GET_ALL_CACHE_MISS.increment();
    }
    Applications apps = new Applications();
    apps.setVersion(1L);
    // 从本地registry获取所有的服务实例信息工nstanceinfo
    for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
        // ...
    }
    if (includeRemoteRegion) {
        // 获取远程 Region 中的 Eureka Server 中的注册表信息
        // ...
    }
    apps.setAppsHashCode(apps.getReconcileHashCode());
    return apps;
}

它首先会将本地注册表 register 中的所有服务实例信息提取出来封装到 Applications 中,再根据是否需要拉取远程 Regist 中的注册表信息,将远程 Region 的 Eureka Server 注册表中的服务实例信息添加到 Application 中。最后将封装了全量注册表信息的 Applications 返回给 Client。

4.7.2 getApplicationDeltasFromMultipleRegions

getApplicationDeltasFromMultipleRegions 方法将会从多个地区中获取增量式注册表信息,并封装成 Applications 返回,代码如下:

public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
    if (null == remoteRegions) {
        remoteRegions = allKnownRemoteRegions; // null means all remote regions.
    }

    boolean includeRemoteRegion = remoteRegions.length != 0;

    if (includeRemoteRegion) {
        GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
    } else {
        GET_ALL_CACHE_MISS_DELTA.increment();
    }

    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        // 开启写锁
        write.lock();
        // 遍历 recentlyChangedQueue 队列获取最近变化的服务实例信息 InstanceInfo
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
        while (iter.hasNext()) {
            // ...
        }
        // 获取远程Region 中的 Eureka Server 的增量式注册表信息
        if (includeRemoteRegion) {
            // ...
        }

        Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
        // 计算应用集合一致性哈希码,用以在Eureka Client 拉取时进行对比
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

获取增量式注册表信息将会从 recentlyChangedQueue 中获取最近变化的服务实例信息。recentlyChangedQueue 中统计了进 3 分钟内进行注册、修改和剔除的服务实例信息,在服务注册 AbstractInstanceRegistry # registry 、接受心跳请求 AbstractInstanceRegistry # renew 和服务下线 AbstractInstanceRegistry # internalCancel 等方法中均可见到 recentlyChangedQueue 对这些服务实例进行登记,用于记录增量式注册表信息。getApplicationDeltasFromMultipleRegions 方法同样提供了从远程 Region 的 Eureka Server 获取增量式注册表信息的能力。

五、最后

上面对 Eureka Client 及 Eureka Server 的主要功能进行分析(读书笔记),篇幅应该说是非常的长了,所以本篇内容还是需要长时间的研读并结合源码去理解。

对于书中的后续内容“Eureka 进阶”就不打算放出来了,太耗费时间去整理了,如果大家感兴趣可以根据末尾的参考资料去阅读,这就都不多说明了哈!

好了,今天的内容到这里就结束了,关注我,我们下期见


参考资料

资料一:《SpringCloud微服务架构进阶》

资料二:https://www.jianshu.com/p/be18c452be2b

扩展阅读

  • 由于博主才疏学浅,难免会有纰漏,假如你发现了错误或偏见的地方,还望留言给我指出来,我会对其加以修正。
  • 如果你觉得文章还不错,你的转发、分享、点赞、留言就是对我最大的鼓励。
  • 感谢您的阅读,十分欢迎并感谢您的关注。

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

CSDN:J3 - 白起

这是一个技术一般,但热衷于分享;经验尚浅,但脸皮够厚;明明年轻有颜值,但非要靠才华吃饭的程序员。

^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^


  1. CAP理论 ??

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 11:56:40  更:2021-07-26 11:57:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/26 16:02:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码