- J3 - 白起
- 技术 # 笔记 # 小白学微服务 # Eureka
上篇我对微服务方案实现的其中之一SpringCloud进行了介绍,相信大家已经对它有个大致的了解了。👉《小白学微服务》之,什么是Spring Cloud Netflix
比如,SpringCloud不是一个技术,而是一个技术的集合体;再比如SpringCloud组件中的各个技术都是以SpringBoot为基础实现的等。
那既然知道了这些,我就开始介绍SpringCloud中的第一个组件了-Eureka 。
一、是什么
SpringCloud Eureka 是 SpringCloud Netflix 微服务套件的一部分,基于 Netflix Eureka 做了二次封装,主要负责实现微服务架构中的服务治理功能。
SpringCloud Eureka 是一个基于 REST 的服务,并且提供了基于 Java 的客户端组件,能够非常方便地将服务注册到 SpringCloud Eureka 中进行统一管理。
在微服务架构体系中,服务治理是必不可少的。在服务治理中,服务注册中心是一定要有的,而除了用 Eureka 作为注册中心之外,还可以用,Consul、Etcd、Zookeeper 等来作为服务的注册中心。
当然,在常见的可以作为服务注册中心的组件中,Zookeeper 和 Eureka 一样都是使用的比较多的。那为什么SpringCloud不用 Zookeeper 当作注册中心而选用 Eureka呢!
这就不得不提一个在分布式领域中的著名定理CAP 了。
1.1 CAP定理
CAP定理又称CAP原则,指的是在一个分布式系统中,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),最多只能同时满足三个特性中的两个,三者不可兼得。
data:image/s3,"s3://crabby-images/d9967/d9967065f2043ed577a58d66d9f2a4c1fe8045f6" alt="在这里插入图片描述"
- C:Consisteny(一致性),比如数据库是主从模式,一个写库请求进来了,master库完成了写入操作,但是再slave同步数据之前,另一个用户查了这条数据,结果没查到,但是也没报错,这就不是
强一致性 。虽然最终会同步成功,但这是最终一致性 的体现。强一致性的体现在于我不管你因为什么没同步成功(可能网络延迟或其他等),只要没同步成功,我这个slave就不能对外提供服务。必须主从数据一致才可以提供服务。(很少有做到这点的) - A:Availability(可用性),还是上面的例子,就是保证了可用性。因为虽然主从没同步完成,但是我从库照样能提供服务而且及时响应结果。也就是说可用性保证服务可用,而不在乎数据是否一致。明显和C是冲突的,那CA怎么还能组合到一起?其实是可以的(单机部署)。
- P:Partition Tolerance(分区容错性),集群部署了三台服务。挂了一台,其他两台还能继续对外提供服务,这时候我就认为他是没问题的,也就是我能容忍你挂了一台,只要还有服务能对外提供请求即可。所以一般分区容忍性是必须的,一般都需要从C和A之间做选择。
那么现在两两组合的话就有下面三种情况:
-
CP :满足一致性和分区容忍性,常见的就是 Zookeeper 集群。 -
AP :满足可用性和分区容忍性,创建的就是 Eureka 集群。 -
CA :满足一致性和可用性,适用单机部署系统,扩展性不强。
至于在分布式微服务系统中如何抉择,就没有很好的定论了。所以在设计系统架构的时候,我们应该根据系统具体的业务场景来权衡CAP。
只有适合的才是最好的,切不可为了追求完美而浪费太多的时间精力在CAP的抉择上。
更多CAP内容可看扩展阅读一
1.2 为什么SpringCloud注册中心选择Eureka
根据上面的CAP理论,我们知道 zookeeper 和 Eureka 在设计的时候就已经对分布式场景作出了一些取舍。
zookeeper的CP保证
当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟以前的注册信息,但不能接受服务直接down掉不可用。也就是说,服务注册功能对可用性的要求要高于一致性。但是zk会出现这样一种情况,当master节点因为网络故障与其他节点失去联系时,剩余节点会重新进行leader选举。问题在于,选举leader的时间太长,30 ~ 120s,且选举期间整个zk集群都是不可用的,这就导致在选举期间注册服务瘫痪。在云部署的环境下,因网络问题使得zk集群失去master节点是较大概率会发生的事,虽然服务能够最终恢复,但是漫长的选举时间导致的注册长期不可用是不能容忍的。
Eureka的AP保证
Eureka看明白了这一点,因此在设计时就优先保证可用性。Eureka各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。而Eureka的客户端在向某个Eureka注册或时如果发现连接失败,则会自动切换至其它节点,只要有一台Eureka还在,就能保证注册服务可用(保证可用性),只不过查到的信息可能不是最新的(不保证强一致性)。除此之外,Eureka还有一种自我保护机制,如果在15分钟内超过85%的节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,此时会出现以下几种情况:
- Eureka不再从注册列表中移除因为长时间没收到心跳而应该过期的服务
- Eureka仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上(即保证当前节点依然可用)
- 当网络稳定时,当前实例新的注册信息会被同步到其它节点中
结论:Eureka可以很好的应对因网络故障导致部分节点失去联系的情况,而不会像Zookeeper那样使整个注册服务瘫痪。
1.3 Eureka架构
Eureka1.x版本架构图
data:image/s3,"s3://crabby-images/7785b/7785b243db7d62b4fec57bd839eec6c8d82ce0ce" alt="在这里插入图片描述"
从架构图中我先来说一下Eureka的分区 。
Eureka 提供了 Region(区域) 和 Availability Zone(可用区) 两个概念来进行分区,这两个概念均来自于亚马逊的 AWS:
region:可以理解为地理上的不同区域,比如亚洲地区,中国区或者深圳等等。没有具体大小的限制,根据项目具体的情况,可以自行合理划分 region。图中us-east-1c、us-east-1d、us-east-1e就是一个个不同的区域。
Availability Zone:可以简单理解为 region 内的具体机房,比如说 region 划分为深圳,然后深圳有两个机房,就可以在此 region 之下划分出 zone1、zone2 两个 zone。
了解分区之后,可以知道 Eureka 之间的行为非常之多,具体如下:
Application Service :由名字可知,它是一个服务的提供者,同时也是一个Eureka Client。作用,扮演服务提供角色,提供业务服务,向 Eureka Server 注册和更新自己的信息,同时能从 Eureka Server 注册表中获取到其它服务信息。Eureka Server :扮演服务注册中心的角色,提供服务注册和发现的功能。每个 Eureka Client 向 Eureka Server 注册自己的信息,也可以通过 Eureka 获取到其它服务的信息达到发现和调用其它服务的目的。Application Client :是一个 Eureka Client ,扮演服务消费者的角色,通过 Eureka Server 获取注册到其上的其它服务信息,从而根据信息找到所需的服务发起远程调用。Replicate :Eureka Server 之间注册表信息的同步复制,使 Eureka Server 集群中不同注册表中的服务实例信息保持一致。Make Remote Call :服务之间的远程调用。Register :注册服务实例,Client 端向 Server 端注册自身的元数据以提供服务发现。Renew :续约,通过发送心跳到 Server 以维持和更新注册表中服务实例元数据的有效性。当在一定时长内,Server 没有收到 Client 的心跳信息,将默认服务下线,会把服务实例信息从注册表中删除。Cancel :服务下线,Client 再关闭时主动向 Server 注销服务实例元数据,这是 Client 的服务实例数据信息将从注册表中删除。Get Registry :获取注册表,Client 向 Server 请求注册表信息,用于服务发现,从而发起服务之间的远程调用。
这些行为在 Eureka 中发挥着重要的作用,那了解了以上的这些内容相信Eureka中各位的脑海中有了一个大致的轮廓的。比如Eureka是一个服务治理框架,用于服务注册发现,强调AP、用分区的架构来实现集群等。
二、基础应用
好记性不然烂笔头,虽然用在这里不恰当,但明白我的意思就行,讲了很多,下面就来实操搭建一下Eureka的Server,Client端应用程序。
一点说明:
2.1 项目基础模块规划
因为 Eureka 是 SpringCloud 的一个组件,所以前期肯定是搭建一个微服务项目,而对于微服务项目,依照本人的搭建习惯,会将项目的目录结构做如下调整:
spring-cloud-study(父工程)
|
----server(业务模块)
----server-user(业务具体模块)
----server-XXXX
|
----spring-cloud(cloud模块)
----spring-cloud-eureka(cloud的各个组件)
----spring-cloud-config
----spring-cloud-XXXX
既然目录结构清晰了,那先创建父工程,业务模块,cloud 模块出来
data:image/s3,"s3://crabby-images/b7ae8/b7ae8297b27f017c9ba4e78ef95a75efd6e9e396" alt="在这里插入图片描述"
1)父工程 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/>
</parent>
<groupId>cn.baiqi</groupId>
<artifactId>springcloud-study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>springcloud-study</name>
<modules>
<module>spring-cloud</module>
<module>server</module>
</modules>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.SR3</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2)业务模块 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>springcloud-study</artifactId>
<groupId>cn.baiqi</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>server</name>
<packaging>pom</packaging>
<modules>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3)cloud 模块 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>springcloud-study</artifactId>
<groupId>cn.baiqi</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud</name>
<packaging>pom</packaging>
<modules>
</modules>
<dependencies>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
到此项目的架子算是搭建起来了,创建聚合工程的时候我们要注意如下几点
- 父级项目只做聚合,不写任何代码,打包方式为 pom
- 父级项目的 pom 文件中要聚合子项目将其下的子项目聚合到
<modules> 标签中 - 子模块的
<parent> 依赖要写父级坐标,打包方式为 jar
现在可以往项目中添加一个个的组件模块了。
2.2 Eureka注册中心搭建
首先我们在 spring-cloud 模块中添加一个子模块,用来当作 Eureka 注册中心,子模块名:spring-cloud-eureka
1)创建 SpringBoot 项目,目录结构如下:
data:image/s3,"s3://crabby-images/54d05/54d0500eae1a3109a3ad7042dc1412c9d39cba92" alt="在这里插入图片描述"
2)编写 pom 文件,并导入 Eureka 服务端依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>spring-cloud</artifactId>
<groupId>cn.baiqi</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-eureka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-cloud-eureka</name>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3)编写配置文件:application.yml
server:
port: 9000
spring:
application:
name: spring-cloud-eureka
eureka:
client:
service-url:
defaultZone: http://localhost:9000/eureka
register-with-eureka: false
fetch-registry: false
4)启动类上添加@EnableEurekaServer 注解
@EnableEurekaServer
@SpringBootApplication
public class SpringCloudEurekaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringCloudEurekaApplication.class, args);
}
}
5)启动 Eureka 注册中心服务,访问:http://localhost:9000/,看到如下页面即注册中心搭建完成。
data:image/s3,"s3://crabby-images/481ec/481ece784afd5a80913c4dac07dc61f0083d9f34" alt="在这里插入图片描述"
注册中心搭建好之后,其它服务就可以往这个注册中心注册服务了,按照习惯我准备了两个业务服务:
- server-user(调用者)
- server-order(提供者)
下面分别搭建一下把。
2.3 Eureka Client搭建-服务提供者
业务服务名:server-order
1)创建 SpringBoot 项目,目录结构如下:
data:image/s3,"s3://crabby-images/c890e/c890e0aad74f357ac0dbecea4e1a151b6d57aaa3" alt="在这里插入图片描述"
2)因为 server-order 的父项目是 server ,所以只需在 server 中的 pom 里添加 Eureka 的客户端依赖,这样只要是 server 下面的项目,都会因为依赖传递而获取到相关的依赖,记住别忘了将子项目聚合到父项目中(<modules> 标签里)
server-order 的 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>server</artifactId>
<groupId>cn.baiqi</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>server-order</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>server-order</name>
<dependencies>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server 的 pom 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>springcloud-study</artifactId>
<groupId>cn.baiqi</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>server</name>
<packaging>pom</packaging>
<modules>
<module>server-user</module>
<module>server-order</module>
</modules>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3)配置文件:application.yml
server:
port: 9001
eureka:
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 10
prefer-ip-address: true
client:
service-url:
defaultZone: http://localhost:9000/eureka
spring:
application:
name: server-order
logging:
level:
org.springframework: debug
4)添加一个 Controller,对外提供调用
@RestController
@RequestMapping("/order")
public class OrderController {
@GetMapping("/getAll/{value}")
public String getAll(@PathVariable String value) {
return "order服务提供了相关数据:" + value;
}
}
该接口被调用后会返回相应的数据
5)Eureka Client 端在启动类中不需要添加 @EnableXXXX 注解,只要添加了 Client 依赖,它就是一个 Eureka Client 应用,直接启动就行了。
2.4 Eureka Client搭建-服务调用者
业务服务名:server-user
1)创建 SpringBoot 项目,目录结构如下:
data:image/s3,"s3://crabby-images/6c347/6c3476db1ce20ecb053f81544827102f1794c7c3" alt="在这里插入图片描述"
2)server-user 模块的 pom 文件同服务提供者一样,不赘述,server 的 pom 文件只需聚合子项目模块就行。
3)配置文件:application.yml
server:
port: 9002
eureka:
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 10
prefer-ip-address: true
client:
service-url:
defaultZone: http://localhost:9000/eureka
spring:
application:
name: server-user
logging:
level:
org.springframework: debug
4)添加一个调用 Controller,获取相关数据
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private RestTemplate restTemplate;
@GetMapping("/getUserOrder/{userId}")
public String getUserOrder(@PathVariable Long userId) {
String url = "http://server-order/order/getAll/{userId}";
String body = restTemplate.getForEntity(url, String.class, 1L).getBody();
return body;
}
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return new RestTemplate();
}
}
访问该接口就可以调用对应的服务提供者获得相关数据。
5)Eureka Client 端在启动类中不需要添加 @EnableXXXX 注解,只要添加了 Client 依赖,它就是一个 Eureka Client 应用,直接启动就行了。
2.5 启动测试
依次启动应用,访问 Eureka 的注册中心主页,界面如下:
data:image/s3,"s3://crabby-images/03a0f/03a0fdf1899eeb41658d1cf1edfdab3fddfe98a1" alt="在这里插入图片描述"
访问:http://localhost:9002/user/getUserOrder/2,结果如下
order服务提供了相关数据:2
RestTemplate 将根据服务名 server-order 通过预先从 spring-cloud-eureka 缓存到本地的注册表中获取到 server-order 服务的具体地址 ,从而发起服务间调用。
三、Eureka Client 源码解析
通过上面的服务调用案例,我们基本上不用做过多的复杂操作,即可完成一个远程的服务调用。
所以 Eureka Client 为了简化开发人员的工作,将很多与 Eureka Server 交互的工作都隐藏起来了,自主完成。
下面看看各个阶段,Eureka Client 在后台自动完成的工作图:
data:image/s3,"s3://crabby-images/68b0e/68b0ec222885cd9895707b335aa3c16c8219fba9" alt="在这里插入图片描述"
对于 Eureka Client 而言,它没有在启动类上加 @EnableXXXX 注解依然可以正常使用其相关的功能,那么我们肯定就想到了 SpringBoot 的自动装配原理。
找到 Eureka Client 源码包中的 spring.factories 文件就可以知道在启动的时候,自动配置了那些功能。
data:image/s3,"s3://crabby-images/5c591/5c591f66a644145e19bf9fdb8bde4e938b0a0a02" alt="在这里插入图片描述"
在自动装配类中,我们关注下面三个配置类:
EurekaClientAutoConfiguration :Eureka Client 自动配置类,负责Eureka Client 中关键Beans的配置和初始化,如ApplicationManager 和 EurekaClientConfig 等。RibbonEurekaAutoConfiguration :Ribbon 负载均衡相关配置。EurekaDiscoveryClientConfiguration :配置自动注册和应用的健康检测器。
3.1 读取应用自身配置
EurekaClientAutoConfiguration 配置类中定义了非常多的Bean,在 SpringBoot 启动的时候就会完成这些Bean的读取和配置,主要的配置如下。
类名 | 作用 |
---|
EurekaClientConfig | 封装 Eureka Client 与 Eureka Server 交互所需要的配置信息。SpringCloud 为其提供了一个默认配置类的 EurekaClientConfigBean,可以在配置文件中通过前缀 eureka.client+属性名进行属性覆盖。 | ApplicationInfoManager | 作为应用信息管理器,管理服务实例的信息类 InstanceInfo 和服务实例的配置信息类 EurekaInstanceConfig。 | InstanceInfo | 封装将被发送到 Eureka Server 进行服务注册和服务实例元数据。它在 Eureka Server 的注册表中代表一个服务实例,其他服务实例可以通过 InstanceInfo 了解该服务实例的相关信息从而发起服务请求。 | EurekaInstanceConfig | 封装 Eureka Client 自身服务实例的配置信息,主要用于构建 InstanceInfo 通常这些信息在配置文件中的 eureka.instance 前缀下进行设置,SpringCloud 通过 EruekaInstanceConfigBean 配置类提供默认配置。 | DiscoveryClient | SpringCloud 中定义用来服务发现的客户端接口。 |
EurekaClientAutoConfiguration 配置类代码:
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class})
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
}
看到这么多注解不要慌,有些不是我们的关注点,我们只需要关注下面几点就可:
关注点一:@ConditionalOnBean 注解作用,该类在加载之前环境中必须要有EurekaClientConfig.class ,EurekaDiscoveryClientConfiguration.Marker.class 两个类,否则不加载。
关注点二:@ConditionalOnProperty 注解作用,配置文件中eureka.client.enabled 属性的值为 ture 时,该配置类才生效,而该属性默认值就为true,所以该属性配置可以忽略。这也是我们不用在 Eureka 客户端程序的启动类上添加@EnableEurekaClient 注解,程序依然生效的原因。
关注点三:@AutoConfigureBefore 和@AutoConfigureAfter 注解作用,该类的加载的顺序为@AutoConfigureAfter 注解中定义的配置类之后,然后再是该配置类,最后再是@AutoConfigureBefore 注解中定义的类加载。
- 加载顺序:
@AutoConfigureAfter ==> EurekaClientAutoConfiguration ==> @AutoConfigureBefore
在加载@AutoConfigureAfter 注解时,当加载到EurekaDiscoveryClientConfiguration 后,EurekaClientAutoConfiguration 配置类就满足了所有定义的条件,即生效了。
data:image/s3,"s3://crabby-images/a6589/a65893bdeb3af2952f1996a5b5b013853a41947b" alt="在这里插入图片描述"
3.2 客户端发现
先来了解一下DiscoveryClient 接口,和DiscoveryClient 类。
DiscoveryClient 接口是 SpringCloud 框架提供的,主要为了扩展 Netflix 提供的 Eureka 客户端而提供的,该接口的实现类通过组合的方式引入了 Netflix 提供的 DiscoveryClient 类,然后进行了进一步封装,让开发者更加容易使用 SpringBoot 进行基于 Eureka 的开发。
DiscoveryClient 类是 Netflix 开源框架提供的,主要用于与Eureka服务端(即注册中心)进行交互。
DiscoveryClient 接口和其默认实现类EurekaDiscoveryClient 代码
DiscoveryClient.java
public interface DiscoveryClient {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
EurekaDiscoveryClient.java # getInstances()
public class EurekaDiscoveryClient implements DiscoveryClient {
private final EurekaClient eurekaClient;
@Override
public List<ServiceInstance> getInstances(String serviceId) {
List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId,
false);
List<ServiceInstance> instances = new ArrayList<>();
for (InstanceInfo info : infos) {
instances.add(new EurekaServiceInstance(info));
}
return instances;
}
}
EurekaDiscoveryClient 实现了 DiscoveryClient 接口,通过 getInstances() 方法会发现,其中又调用了 Netflix 提供的 EurekaClient 接口这种组合形式的方式使得我们非常容易的进行 Eureka 开发。
data:image/s3,"s3://crabby-images/b0c30/b0c3028cac25e826a244effdbb386976f789640c" alt="在这里插入图片描述"
通过组合形式,我们就将代码拉回到了 Netflix 包中。
EurekaClient 接口来自于 com.netflix.discovery 包中,默认实现为 com.netflix.discovery.DiscoveryClient 它属于 eureka-client的源代码。
DiscoveryClient 可以说时功能非常的强大,提供了 Eureka Client 注册到 Server上、续租、下线、及获取Server中注册表信息等诸多关键功能。
下面继续分析这个类。
3.3 DiscoveryClient类分析
DiscoveryClient 是 Eureka Client 的核心类,负责与 Eureka Server 交互的关键逻辑,具体功能如下:
- 注册服务实例到 Eureka Server 中
- 发送心跳更新与 Eureka Server 的续约
- 在服务关闭时从 Eureka Server 中取消续约,服务下线
- 查询在 Eureka Server 中注册的服务实例列表
上面有该类的类图👆。
DiscoveryClient 继承了 LookupService 接口, LookupService 作用是发现活跃的服务实例, 主要方法如下:
public interface LookupService<T> {
Application getApplication(String appName);
Applications getApplications();
List<InstanceInfo> getInstancesById(String id);
}
Application :持有服务实例信息列表,它可以理解成同一个服务的集群信息,这些服实例都挂在同一个服务名 appName 下。Applications :表示注册表中所有服务实例信息的集合InstanceInfo :表示一个服务实例信息。
Application 和Applications 类中,对InstanceInfo 的操作都是同步操作,防止出错。
EurekaClient 在LookupService 接口的基础上扩充了更多的接口,并提供了非常丰富的获取服务实例的方式,下面主要关注两个接口。
@ImplementedBy(DiscoveryClient.class)
public interface EurekaClient extends LookupService {
public void registerHealthCheck(HealthCheckHandler healthCheckHandler);
public void registerEventListener(EurekaEventListener eventListener);
}
在 Eureka Server 中一般通过心跳(heartbeats)来识别一个实例的状态。Eureka Client 中存在一个定时任务定时通过 HealthCheckHandler 检测当前 Client 的状态,如果 Client 的状态发生改变,将会触发新的注册事件,更新 Eureka Server 的注册表中该服务的相关信息。
HealthCheckHandler 代码如下:
public interface HealthCheckHandler {
InstanceInfo.InstanceStatus getStatus(InstanceInfo.InstanceStatus currentStatus);
}
Eureka 中的事件监听模式属于观察者模式,事间监听器将监听 Client 的服务实例信息变化,触发对应的处理事件,EurekaEvent类图如下。
data:image/s3,"s3://crabby-images/da1e7/da1e7179cfb99c4ff524ba63cd4214396a5f4ab7" alt="在这里插入图片描述"
3.4 DiscoveryClient构造函数
咱们再回到EurekaClientAutoConfiguration 类中。
当程序加载该类的时候,关注如下代码:
public class EurekaClientAutoConfiguration {
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if (AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
}
else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
}
该类加载时,Eureka Client 的初始化入口就是从这里开始进入的,具体的说,可以是下面这都段代码:
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
this.context);
进入 CloudEurekaClient 类的构造器,可以确认最终它调用的是 DiscoveryClient 类的构造器。
DiscoveryClient 构造器所做的功能非常多,包含:
- Eureka Client 从 Eureka Server 中拉取注册表信息
- 服务注册
- 初始化发送心跳
- 缓存刷新(重新拉去注册表信息)
- 按需注册定时任务
上面所列的功能,可以说是贯穿了 Eureka Client 启动阶段的各项工作。
构造器代码如下:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
}
简单介绍一下构造参数:
ApplicationInfoManage r:上面提到过,它是应用信息管理器。EurekaClientConfig :上面提到过,封装了 Client 和 Server 交互配置信息的类。AbstractDiscoveryClientOptionalArgs :注入一些可选参数,和一些像 jersey1,jersey2等通用过滤器。BackupRegistry :备份注册中心的职责,当 Eureka Client 无法从任何一个Eureka Server 中获取注册表信息时,BackupRegistry将被调用以获取注册表信息。
从构造器代码往下看,忽略一些参数校验,属性赋值等之后,先来关注下面几行代码:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
return;
}
}
上面两个 if 判断语句中,就对应着 Eureka 拉取注册列表和注册服务,如果两个都为 flase 得话,那 Discovery 初始化直接结束,表示该客户端既不进行服务注册也不进行服务发现。
config.shouldFetchRegistry()对应的配置:eureka client.fetch.register
- 为 true 则表示将从 Eureka Server 中拉去注册表信息
config.shouldRegisterWithEureka()对应的配置:eureka.client.register.with-eureka
- 为 true 则表示将注册到 Eureka Server 中
往下看,接着定义了一个线程池 ScheduledExecutorService ,线程池大小为2,一个线程用于发送心跳,另一个线程用于缓存刷新,同时定义了发送心跳和缓存刷新线程池。
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(...);
cacheRefreshExecutor = new ThreadPoolExecutor(...);
往下看,初始化了 Eureka Client 和 Eureka Service 进行 HTTP 交互的 Jersey 客户端,将 AbstractDiscoveryClientOptionalArgs 中的属性用来构建 EurekaTransport。
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
往下看,接着从 Eureka Server 中拉取注册表信息。
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
如果 EurekaClientConfig # shouldFetchRegistry 为 ture 时,fetchRegistry 方法将被调用。
在 Eureka Client 向 Eureka Server 注册前,需要先从 Eureka Server 中拉取注册表中的信息,这是服务发现的前提。通过将 Eureka Server 中的注册表信息缓存到本地,就可以就近获取其它服务的相关信息,减少与 Eureka Server 的网络通信。
拉取完 Eureka Server 中的注册表信息后,将对服务实例进行注册,代码如下:
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
initScheduledTasks();
在服务注册之前会进行注册预注册,Eureka 没有对此提供默认实现。构造函数的最后将初始化并启动发送心跳,刷新缓存和按需注册等定时任务。
看到这里,DiscoveryClient 的构造函数中重要的功能步骤都讲解了一下,那咱们来总结一下在这里面所做的所有功能:
- 相关属性的赋值,如:ApplicationInfoManager,EurekaClientConfig等。
- 备份注册中心的初始化,默认没有实现。
- 拉取 Eureka Server 注册表中的信息
- 注册前的预处理
- 向 Eureka Server 注册自身
- 初始化心跳定时任务,缓存刷新和按需注册等定时任务
来一个清晰的流程图: data:image/s3,"s3://crabby-images/bb679/bb67995a4194fb30252fe3692bb58c5efcac7e2b" alt="在这里插入图片描述"
整体的 Eureka Client 初始化的功能都分析完了,下面根据具体的功能进行分析,如拉取注册信息,续约,心跳,注册等。
3.5 拉取注册表信息
拉取注册表的入口在,DiscoveryClient 类的 fetchRegistry 方法,相关代码如下:
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1))
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
onCacheRefreshed();
updateInstanceRemoteStatus();
return true;
}
当 Eureka 客户端第一次注册到 Eureka 服务端时才会进行全量拉取,其他时候都只进行增量拉取,下面来看看这两个拉取注册表信息的具体逻辑。
- DiscoveryClient # getAndStoreFullRegistry():全量拉取
- DiscoveryClient # getAndUpdateDelta():增量拉取
3.5.1 全量拉取注册表信息
DiscoveryClient # getAndStoreFullRegistry()代码如下:
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
全量拉取的方式将会从 Eureka Server 中拉取注册表中所有得服务实例信息(封装在 Applications中),并经过处理后替换掉本地注册表缓存Applications。
通过 debug 启动方式启动,我们可以看到向 Eureka Server 全量拉取服务注册表信息的 url 为:http://localhost:9000/eureka/apps,前面“http://localhost:9000/eureka”是配置文件配置的注册中心地址,后面“/apps/”是程序自己拼接的。
具体代码:AbstractJerseyEurekaHttpClient # getApplicationsInternal,截图如下。
data:image/s3,"s3://crabby-images/15d33/15d336ee2a352467538e16966ff29396cf0d4567" alt="在这里插入图片描述"
getAndStoreFullRegistry() 方法是有可能被多个线程同时调用的,这会导致拉取的注册表被旧的注册表所覆盖(有可能出现先拉取注册表信息的线程在覆盖 apps 时被阻塞,被后拉取注册表信息的线程抢先设置了 apps,被阻塞的线程恢复后在次设置了 apps,导致 apps 数据版本落后),产生脏数据,对此,Eureka 通过类型为 AtomicLong 的 currentUpdateGeneration 对 apps 的更新版本进行跟踪。如果跟新版本不一致,说明本次拉取注册表信息已过时,不需要缓存到本地。
最后对拉取到的注册表信息 apps 进行了筛选,只保留状态为 UP 的服务实例信息。
3.5.2 增量拉取注册表信息
DiscoveryClient # getAndUpdateDelta()代码如下:
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
增量更新注册表信息,访问 Eureka Server 的 url 为:http://localhost:9000/eureka/apps/delta,前面“http://localhost:9000/eureka”是配置文件配置的注册中心地址,后面“/apps/delta”是程序自己拼接的。
具体代码:AbstractJerseyEurekaHttpClient # getApplicationsInternal,截图如下。
data:image/s3,"s3://crabby-images/b2354/b23546e65a96b331485ad77f51e411eba0a728ea" alt="在这里插入图片描述"
由于更新的过程时间比较久,时间成本为O(N^2),所以需要通过同步代码快防止多个线程同时进行更新,污染数据。
在根据从 Eureka Server 拉取的 delta 信息更新本地缓存的时候,Eureka 定义了 ActionType 来标记变更状态,代码在 InstanceInfo 类中,代码如下:
public enum ActionType {
ADDED,
MODIFIED,
DELETED
}
在更新本地缓存的时候,根据 InstanceInfo # ActionType 的不同,对 delta 中的 InstanceInfo 采取不同的操作,其中 ADDED 和 MODIFIED 状态变更的服务实例信息将添加到本地注册标中,DELETED 状态变更的服务实例将从本地注册表中删除,具体代码如下:
private void updateDelta(Applications delta) {
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
}
}
在更新完本地缓存的注册表中之后,Eureka Client 会通过 DiscoveryClient # getReconcileHashCode(Applications applications) 方法计算合并后的 Applications 的 appHashCode (应用集合一直性哈希码),和 Eureka Server 传递的 delta 上的 appsHashCode 进行比较(delta 中携带的 appsHashCode 通过 Eureka Server 的全量注册表计算得出),比对客户端和服务端上注册表的差异。如果哈希值不一致的话将再次调用一次 getAndStoreFullRegistry 获取全量数据保证 Eureka Client 与 Eureka Server 之间注册表数据的一致。
private void getAndUpdateDelta(Applications applications) throws Throwable {
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);
}
}
在 reconcileAndLogDifference() 方法中将会执行全量拉取注册表信息操作
appsHashCode 的一般表示为:
appsHashCode = ${status}_ ${count} _
它通过将应用状态和数量拼接成字符串,表示了当前注册表中服务实例状态的统计信息。比如:有 10 个应用实例的状态为 UP ,有 5 个应用实例状态为 DOWN ,其中它的状态数量为 0 (不进行表示),那么 appsHashCode 的形式如下是:
appsHashCode = UP_ 10 _ DOWN _ 5 _
3.4.3 拉取注册表流程图
data:image/s3,"s3://crabby-images/e8672/e8672032c522d147afb0e1c47ca69e54ec5a878c" alt="在这里插入图片描述"
3.6 服务注册
在拉取完 Eureka Server 中的注册表信息并将其缓存在本地后,Eureka Client 将向 Eureka Server 注册自身服务实例元数据,主要逻辑在 DiscoveryClient # register 方法中。代码如下:
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
在注册的时候,Eureka CLient 会将自身信息封装成实例元数据(InstanceInfo中)发送到 Eureka Server 中请求服务注册,当 Eureka Server 返回 204 状态码,表示注册成功。
相关的注册 url 在追踪到 AbstractJerseyEurekaHttpClient # register 即可看出为,http://localhost:9000/eureka/apps/SERVER-USER,其中http://localhost:9000/eureka/是配置配置文件指定的注册中心地址,/apps/${app_name}则是具体的服务注册,参数为 InstanceInfo 实例名称。
debug 查看注册url,如下图:
data:image/s3,"s3://crabby-images/ff7bb/ff7bb93675ca3e1c37d981d849611c80fc7bcf77" alt="在这里插入图片描述"
3.7 初始化定时任务
在 Eureka Client 应用中,服务的注册是一个持续的过程,所以 Eureka Client 会通过定时发送心跳的方式于 Eureka Server 进行通信,维持自己在 Server 注册表上的续租。
同时,Eureka Server 注册表中的服务实例信息是动态变化的,为了保持 Eureka Client 与 Eureka Server 的注册表信息一致性,Eureka Client 会定时向 Eureka Server 拉取服务注册表信息并更新本地缓存。
并且 Eureka Cliten 为了监控自身应用信息和状态的变化,Eureka Client 设置了一个按需注册的定时器,定时检查自身应用信息活者状态变化,并在发生变化时向 Eureka Server 重新注册,避免注册表中的本服务实例信息不可用。
在 DiscoveryClient # initScheduledTasks 方法中初始化了三个定时器任务:
- 一个用于向 Eureka Server 拉取注册表信息刷新本地缓存
- 一个用于向 Eureka Server 发送心跳
- 一个用于进行按需注册操作
相关代码如下:
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
3.7.1 缓存刷新定时任务与发送心跳定时任务
在 DiscoveryClient # initScheduledTasks 方法中,通过 ScheduledExecutorService # schedule 的方式提交缓存刷新任务和发送心跳任务,任务执行的方式为延时执行并且不循环,这两个任务的定时循环逻辑由 TimedSupervisorTask 提供实现。
TimedSupervisorTask 继承了 TimeTask ,提供执行定时任务的功能。它在 run 方法中定义执行定时任务的逻辑,具体代码如下:
public class TimedSupervisorTask extends TimerTask {
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
rum 方法中主要实现功能就是定时执行一个任务调度,其过程如下:
schedule 初始化并延迟执行 TimedSupervisorTask 。TimedSupervisorTask 将 task 提交 executor 中执行,task 和 executor 在初始化 TimedSupervisorTask 时传入。- 当 task 正常执行,
TimedSupervisorTask 将自己提交到 schedule ,延迟 delay 时间后再此执行。 - 当 task 执行超时,计算新的 delay 时间 (不超过 maxDelay ),
TimedSupervisorTask 将自己提交到 schedule ,延迟 delay 时间后再次执行。
其执行流程图:
data:image/s3,"s3://crabby-images/a330d/a330d8702d78d885b52ff562d800f3d08acb5dde" alt="在这里插入图片描述"
TimedSupervisorTask 通过这种不断循环提交任务的方式,完成定时执行任务的功能。
在 DiscoveryClient # initScheduledTasks 方法中,提交缓存刷新定时任务的线程任务为 CacheRefreshThread ,提交发送心跳定时任务的线程为 HeartThread 。
CacheRefreshThread 继承了 Runnable 接口,代码如下:
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
@VisibleForTesting
void refreshRegistry() {
try {
boolean success = fetchRegistry(remoteRegionsModified);
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
}
CacheRefreshThread 线程任务将委托 DiscoveryClient # fetchRegistry 方法进行缓存化系的具体操作。
HeartThread 同样继承了 Runnable 接口,该任务的作用是向 Eureka Server 发送心跳请求,维持 Eureka Client 在注册表中的续约,代码如下:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
可看出,其主要的逻辑代码在 renew() 方法中,代码如下:
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
Eureka Server 会根据续租提交的 appName 与 instanceInfoId 来更新注册表中的服务实例的续租。当注册表中不存在该服务实例时,将返回 404 状态码,发送心跳请求的 Eureka Client 在接受到 404 状态码后将会重新发起注册,如果续约成功,将会返回 200 状态码。
通过 debug 我们可以在 AbstractJerseyEurekaHttpClient # sendHeartBeat 方法中,可以发现服务续租调用的接口即传递的参数,如图:
data:image/s3,"s3://crabby-images/d8347/d834763a35a99b07c343231191b2bd39de2d564a" alt="在这里插入图片描述"
续租的 url :http://localhost:9000/eureka/apps/
A
P
P
N
A
M
E
/
{APP_NAME}/
APPN?AME/{INSTANCE_INFO_ID},HTTP方法为put,参数主要有 status(当前服务状态),lastDirtyTimestamp(上次数据变化时间)以及 overriddenStatus。
3.7.2 按需注册定时任务
按需注册定时任务的作用是当 Eureka Client 中的 InstanceInfo 或者 status 发生变化时,重新向 Eureka Server 发起注册请求,更新注册表中的服务实例信息,保证 Eureka Server 注册表中服务实例有效和可用。按需注册代码如下:
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
按需注册功能分为两部分:
- 一部分定义了一个定时任务,定时刷新服务实例的信息和检查应用状态的变化,在服务实例信息发生改变的情况下向 Eureka Server 重新发起注册操作。
- 一部分时注册状态改变监控器,在应用状态改变的情况时,刷新服务实例信息,在服务实例信息发生改变的情况下 Eureka Server 重新发起注册操作。
instanceInfoReplicator 中的定时任务逻辑位于 #run 方法中,如下所示:
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
其中 DiscoveryClient 中刷新本地服务实例信息和检查服务状态变化的代码如下(discoveryClient.refreshInstanceInfo):
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
在 instanceInfoReplicator # run 方法中,首先调用了 discoveryClient.refreshInstanceInfo 方法刷新当前的服务实例信息,查看当前服务实例信息和服务状态是否发生变化,如果当前服务实例信息或者服务状态发生变化将向 Eureka Server 重新发起服务注册操作。
最后再此声明了一下延时任务,用于再测调用 run 方法,继续检查服务实例信息和服务状态的变化,在服务实例信息发生变化的情况下重新发起注册。
如果 Eureka Client 的状态发生变化(在 SpringBoot 通过 Actuator 对服务状态进行监控,具体实现为 EurekaHealthCheckHandler ),注册在 ApplicationInfoManager 的状态改变监控器将会被触发,从而调用 InstanceInfoReplicator # onDemandUpdate 方法,检查服务实例信息和服务状态的变化,可能会引发按需注册任务。代码如下:
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}
InstanceInfoReplicator # onDemandUpdate 方法调用 InstanceInfoReplicator # run 方法检查服务实例信息和服务状态的变化,并在服务实例信息发生变化的情况下向 Eureka Server 发起重新注册的请求。
为了防止重新重复执行 run 方法, onDemandUpdate 方法还会取消执行上次已提交且为未完成的 run 方法,执行最新的按需注册任务。
按需注册定时任务的处理流程如图:
data:image/s3,"s3://crabby-images/782b0/782b0c0fe64cbc99532b293b53083a631eb6b06f" alt="在这里插入图片描述"
3.8 服务下线
服务下线就是 Eureka Client 向 Eureka Server 注销自身在注册表中的信息,DiscoveryClient 中对象在销毁前执行的清理方法如下:
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
在销毁 DiscoveryClient 之前,会进行一系列的清理工作,包括 ApplicationInfoManager 中的 StatusChangeListener、取消定时任务、服务下线和关闭 Jersey 客户端等。
下面我们主要看 unregister 服务下线方法,代码如下:
void unregister() {
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
其中调用了 AbstractJerseyEurekaHttpClient # cancel 方法中,可以发现服务下线调用的接口以及传递的参数,代码如下:
@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder.delete(ClientResponse.class);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP DELETE {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
服务下线的接口地址为 apps/
A
P
P
N
A
M
E
/
{APP_NAME}/
APPN?AME/{INSTANCE_INFO_ID},参数为服务名称和服务实例id,HTTP方法为delete。
3.9 Eureka Client源码执行流程总图
data:image/s3,"s3://crabby-images/13dfa/13dfa24941adacdeccaff98bd6c59554c91aeb10" alt="在这里插入图片描述"
四、Eureka Server 源码解析
Eureak Server 作为一个开箱即用的服务注册中心,提供了以下功能,用以满足与 Eureka Client 交互的需求:
- 服务注册
- 接受服务心跳
- 服务剔除
- 服务下线
- 集群同步
- 获取注册表中服务实例信息
同时 Eureka Server 它也是一个 Eureka Client,所以在不禁用 Eureka Server 的客户端行为时,它是会向它配置文件中的其他 Eureka Server 进行拉取注册表、服务注册和发送心跳等操作。
下面先看注册表的类关系 InstanceRegistry ,为后面的服务注册、Eureka-Server 集群复制做整体的铺垫。
4.1 服务实例注册表
InstanceRegistry 是 Eureka Server 中注册表管理的核心接口,类结构图如下:
data:image/s3,"s3://crabby-images/95b59/95b5995413ee35b3069de60ef6cee21bc82eb35c" alt="在这里插入图片描述"
InstanceRegistry类 ,对 Eureka Server 的注册表实现类 PeerAwareInstanceRegistryImpl 进行了继承和扩展,使其适配 Spring Cloud 的使用环境,主要实现由 PeerAwareInstanceRegistryImpl 提供。InstanceRegistry接口 ,是 Eureka Server 注册表的最核心接口,其职责是在内存中管理注册到 Eureka Server 中的服务实例信息。LeaseManager接口 ,是对注册到 Eureka Server 中的服务实例租续进行管理。LookupService接口 ,是提供对服务实例进行检索,发现活跃的服务实例功能(Eureka Client源码介绍中讲过)。
从上而下,LeaseManager 接口提供的方法代码如下:
public interface LeaseManager<T> {
void register(T r, int leaseDuration, boolean isReplication);
boolean cancel(String appName, String id, boolean isReplication);
boolean renew(String appName, String id, boolean isReplication);
void evict();
}
LeaseManager 接口的作用是对注册到 Eureka Server 中的服务实例续约进行管理,分别有服务注册、服务下线、服务租续更新以及服务剔除等操作。
在 LeaseManager 中管理的对象是 Lease ,Lease 代表一个 Eureka Clietn 服务实例信息的租续,它提供了对其内持有的类的事件有效性操作。Lease 持有的类是代表服务实例信息的 InstanceInfo。Lease中定义了租约的操作,分别是注册、下线、更新,同时提供了对租约中事件属性的各项操作。租约默认的有效时长(duration)为 90 秒
InstanceRegistry接口 在继承 LeaseManager 和 LookupService 接口的基础上,还添加了一些特有的方法,可以更为简单地管理服务实例租约和查询注册表中的服务实例信息。可以通过 AbstractInstanceRegistry 查看 InstanceRegistry 接口方法的具体实现。
PeerAwareInstanceRegistry 继承了 InstanceRegistry 接口,在其基础上添加了 Eureka Server 集群同步的操作,其实现类 PeerAwareInstanceRegistryImpl 继承了 AbstractInstanceRegistry 的实现,在对本地注册表操作的基础上添加了对其 peer 节点的同步复制操作,使得 Eureka Server 集群中的注册表信息保持一致。
4.2 服务注册
Eureka Client 在发起服务注册时会将自身服务实例元数据封装在 InstanceInfo 中,然后将 InstanceInfo 发送到 Eureka Server。Eureka Server 在接收到 Eureka Client 发送的 InstanceInfo 后将会尝试将其放到本地注册表中以供其他 Eureka Client 进行服务发现。
服务注册的主要实现位于 AbstractInstanceRegistry # registry 方法中,代码如下:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
synchronized (lock) {
if (this.expectedNumberOfRenewsPerMin > 0) {
this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
registry 的结构:
registry:ConcurrentHashMap<appName, Map<instanceId, Lease<InstanceInfo>>>
在 registry 中,服务实例的 InstanceInfo 保存在 Lease 中,Lease 在 AbstractInstanceRegistry 中统一通过 ConcurrentHashMap 保存在内存中。
在服务注册过程中,会先获取一个读锁,防止其他线程对 registry 注册表进行数据操作,避免数据的不一致。然后从 resgitry 查询对应的 InstanceInfo 租续是否已经存在注册表中,根据 appName 划分服务集群,使用 InstanceId 唯一标记服务实例。如果租约存在,比较两个租约中的 InstanceInfo 的最后更新时间 lastDirtyTimestamp,保留时间戳最大的服务实例信息 InstanceInfo。如果租约不存在,意味着这是一个全新的服务注册,将会进行自我保护的统计,创建新的租约保存 InstanceInfo。接着将租约放到 resgitry 注册表中。
之后将进行一系列缓存操作并根据覆盖状态规则设置服务实例的状态,缓存操作包括将 InstanceInfo 加入用于统计 Eureka Client 增量式获取注册表信息得 recentlyChangedQueue 和失败 responseCache 中对应的缓存。最后设置服务实例租约的上线时间用于计算租约的有效时间,释放读锁完成服务注册。
在服务注册中,registry 方法为了防止数据被错误的覆盖而进行了大量的同步操作(读写锁,synchronized 锁)。
4.3 接受服务心跳
我们知道,Eureka Client 为了向服务端证明自己是有用的,会定时向 Eureka Server 发送一个心跳(默认30秒一次),以此来证明自己可用不被 Eureka Server 从注册表中剔除掉。
在 Eureka Server 中处理心跳请求的核心逻辑位于 AbstractInstanceRegistry # renew 。
renew 方法是对 Eureka Client 位于注册表中的租约的续租操作,不像 register 方法需要服务实例信息,进根据服务实例的服务名和服务实例 id 即可更新对应租约的有效时间,源码如下:
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
instanceInfo.getOverriddenStatus().name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
leaseToRenew.renew();
return true;
}
}
在 renew 方法中,不关注 InstanceInfo ,仅关注于租约本身以及租约的服务实例状态。如果根据服务实例的 appName 和 instanceInfoId 查询出服务实例的租约,并且根据 getOverriddenInstanceStatus 方法等到的 instanceStatus 不为 InstanceStatus.UNKNOWN 那么更新租约中的有效时间,即更新租约 Lease 中 lastUpdateTimestamp ,达到续约的目的;如果租约不存在,那么返回续租失败。
4.4 服务剔除
如果 Eureka Client 在注册后,既没有续约,也没有下线(服务崩溃或者网络异常等原因),那么服务的状态就处于不可知的状态,不能保证能够从该服务中获取到反馈,所以需要服务剔除 AbstractInstanceRegistry # evict 方法定时清理这些不稳定的服务,该方法会批量将注册表中所有过期续租剔除。源代码如下:
@Override
public void evict() {
evict(0l);
}
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
internalCancel(appName, id, false);
}
}
}
服务剔除将会遍历 registry 注册表,找出其中所有的过期租约,然后根据配置文件中续租百分比阈值和当前注册表的租约总数量计算出最大允许的剔除租约的数量(当前注册表中租约总数量减去当前注册表租约阈值),分批次剔除过期的服务实例租约。
对过期的服务实例租约调用 AbstractInstanceRegistry # internalCancel 服务下线的方法将其从注册表中清除掉。
为了保证 Eureka Server 的可用性,在服务剔除 evict 方法中有很多限制:
- 自我保护时期不能进行服务剔除操作。
- 过期操作是分批进行。
- 服务剔除是随机逐个剔除,剔除均匀分布在所有应用中,防止在同一时间内同意服务集群中的服务全部过期被剔除,以致大量剔除发生,在未进行自我保护前提促使程序崩溃。
服务剔除是一个定时的任务,所以 AbstractInstanceRegistry 中定义了一个 EvictionTask 用于定时执行服务剔除,默认 60 秒一次。
服务剔除的定时任务一般在 AbstractInstanceRegistry 初始化结束后进行,按照执行频率 evictionIntervalTimerInMs 的设定,定时剔除过期的服务实例租约。
自我保护机制主要在 Eureka Client 和 Eureka Server 之间存在网络分区的情况下发挥保护作用,在服务器端和客户端都有对应的实现。
假设在某种特定的情况下(网络故障…), Eureka Client 和 Eureka Server 无法进行通信,此使 Eureka Client 无法向 Eureka Server 发起祖册和续约请求,Eureka Server 中就可能因注册表中的服务实例租约出现大量过期而面临被剔除的危险,然而此使的 Eureka Client 可能是处于健康状态的(可接受服务访问),如果直接将注册表中大量过期的服务实例租约剔除掉先然是不合理的。
针对上面那种情况,Eureka 设计了”自我保护机制“。在 Eureka Server 处,如果出现大量服务实例过期被剔除的现象,那么改 Server 节点将进入自我保护模式,保护注册表中的信息不再被剔除,在通信稳定后在退出该模式;在Eureka Client 处,如果向 Eureka Server 注册失败,将快速超时并尝试于其他的 Eureka Server 进行通信。”自我保护机制“的设计大大提高了 Eureka 的可用性。
4.5 服务下线
Eureka Client 在应用销毁时,会向 Eureka Server 发送服务下线请求,清除注册表中关于本应用的租约,避免无效的服务调用。在服务剔除的过程中,也是通过服务下线的逻辑完成对单个服务实例过期租约的清楚工作。
服务下线的主要实现代码位于 AbstractInstanceRegistry # internalCancel 方法中,仅需要服务实例的服务名和服务实例的 id 即可完成服务下线。源代码如下:
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
read.lock();
CANCEL.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
synchronized (recentCanceledQueue) {
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
}
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
internalCancel 方法与 register 方法的行为过程很类似,首先通过 register 根据服务名和服务实例 id 查询关于服务实例的租约 Lease 是否存在,统计最近下线的服务实例用于 Eureka Server 主页展示。如果租约不存在,返回下线失败;如果租约存在,从 register 注册表中移除,设置租约的下线时间,同时在最近租约变更记录队列中添加新的下线记录,以用于 Eureka Client 的增量式获取注册表信息,最后设置 reposonse 缓存过期。
internalCancel 方法中同样通过读锁保证 register 注册表中数据的一致性,避免脏读。
4.6 集群同步
如果,Eurkea Server 是通过集群的方式进行部署,那么为了维护整个集群中 Eureka Server 注册表数据的一致性,势必需要一个机制同步 Eureka Server 集群中注册表的数据。
Eureka Server 集群同步包含两个部分:
- 一部分是 Eureka Server 在启动过程中从它的 peer 节点中拉取注册表信息,并将这些服务实例的信息注册到本地注册表中。
- 另一部分是 Eureka Server 每次对本地注册表进行操作时,同时会将操作同步到它的 peer 节点中,达到集群注册表数据统一的目的。
4.6.1 Eureka Server 初始化本地注册表信息
在 Eureka Server 启动的过程中,会从它的 peer 节点中拉取注册表来初始化本地注册表,这部分主要通过 PeerAwareInstanceRegistry # syncUp 方法完成,他将从可能存在 peer 节点中,拉取 peer 节点中的注册表信息,并将其中的服务实例信息注册到本地注册表中,源代码如下:
public int syncUp() {
int count = 0;
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
Eureka Server 也是一个 Eureka Client ,在启动的时候也会进行 DiscoveryClient 的初始化,会从其对应的 Eureka Server 中拉取全量的注册表信息。
在 Eureka Server 集群部署的情况下, Eureka Server 从它的 peer 节点中拉取到注册表信息后,将遍历这个 Applications,将所有的服务实例通过 AbstractRegistry # register 方法啊注册到自身注册表中。
在初始化本地注册表时,Eureka Server 并不会接受来自 Eureka Client 的通信请求(如注册、或者获取注册表信息等请求)。 在同步注册表信息结束后会通过 PeerAwareInstanceRegistryImpl # openForTraffic 方法允许该 Server 接受流量。源代码如下:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
this.expectedNumberOfRenewsPerMin = count * 2;
this.numberOfRenewsPerMinThreshold =
(int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
在 Eureka Server 中有一个 StatusFilter 过滤器,用于检查 Eureka Server 的状态,当 Server 的状态不为 UP 时,将拒绝所有的请求。
在 Client 请求获取注册表信息时,Server 会判断此时是否允许获取注册表中的信息。这种做法是为了避免 Eureka Server 在 syncUp 方法中没有获取到任何服务实例信息时(Eureka Server 集群部署的情况下),Eureka Server 注册表中的信息影响到 Eureka Client 缓存的注册表中的信息。
如果 Eureka Server 在 syncUp 方法中没有获取任何的服务实例信息,它将把 peerInstancesTransferEmptyOnStartup 设置为 true,这时该 Eureka Server 在 WaitTimeInMsWhenSyncEmpty(可以通过 eureka.server.wait-time-in-ms-when-sync-empty 设置,默认是 5 分钟)时间后才能被 Eureka Client 访问获取注册表信息。
4.6.2 Eureka Server 之间注册表信息的同步复制
为了保证 Eureka Server 集群运行时注册表信息的一致性,每个 Eureka Server 在对本地注册表进行管理操作时,会将相应的操作同步到所有 peer 节点中。
在 PeerAwareInstanceRegistryImpl 中,对 Abstractinstanceregistry 中的 register、cancel 和 renew 等方法都添加了同步到 peer 节点的操作,使 Server 集群中注册信息保持最终一致性,部分源代码如下:
下线
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
}
注册
public void register(final InstanceInfo info, final boolean isReplication) {
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
续约
public boolean renew(final String appName, final String id, final boolean isReplication) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
}
同步的主要操作有(枚举类):
public enum Action {
Heartbeat, Register, Cancel, StatusUpdate, DeleteStatusOverride;
}
每个同步方法都是调用如下方法:
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info ,
InstanceStatus newStatus , boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
peerEurekaNode 代表一个可同步共享数据的 Eureka Server。在 PeerEurekaNode中,具有 register、cancel、heartbeat 和 statueUpdate 等诸多用于向 peer 节点同步注册表信息的操作。
在 replicateInstanceActionsToPeers 方法中根据 action 的不同,调用 PeerEurekaNode 的不同方法进行同步复制,代码如下所示:
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
PeerEurekaNode 中的每一个同步复制都是通过比任务流的方式进行操作,同一时间段内相同服务实例的相同操作将使用相同的任务编号,在进行同步复制的时候根据任务编号合并操作,减少同步操作的数量和网络消耗,但是同步也造成同步复制的延时性,不满足 CAP 中的 C(强一致性)。
通过 Eureka Server 在启动过程中初始化本地注册表信息和 Eureka Server 集群间的同步复制操作,最终达到了集群中 Eureka Server 注册表信息一致的目的。
4.7 获取注册表中服务实例信息
Eureka Server 中获取注册表的服务实例信息主要通过两个方法实现:
AbstractInstanceRegistry # getApplicationsFromMultipleRegions 从多地区获取全量注册表数据AbstractInstanceRegistry # getApplicationDeltasFromMultipleRegions 从多地区获取增量式注册表数据
4.7.1 getApplicationsFromMultipleRegions
getApplicationsFromMultipleRegions 方法将会从多个地区中获取全量注册表信息,并封装成 Application 返回,源代码如下:
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
}
if (includeRemoteRegion) {
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
它首先会将本地注册表 register 中的所有服务实例信息提取出来封装到 Applications 中,再根据是否需要拉取远程 Regist 中的注册表信息,将远程 Region 的 Eureka Server 注册表中的服务实例信息添加到 Application 中。最后将封装了全量注册表信息的 Applications 返回给 Client。
4.7.2 getApplicationDeltasFromMultipleRegions
getApplicationDeltasFromMultipleRegions 方法将会从多个地区中获取增量式注册表信息,并封装成 Applications 返回,代码如下:
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions;
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
try {
write.lock();
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
while (iter.hasNext()) {
}
if (includeRemoteRegion) {
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
获取增量式注册表信息将会从 recentlyChangedQueue 中获取最近变化的服务实例信息。recentlyChangedQueue 中统计了进 3 分钟内进行注册、修改和剔除的服务实例信息,在服务注册 AbstractInstanceRegistry # registry 、接受心跳请求 AbstractInstanceRegistry # renew 和服务下线 AbstractInstanceRegistry # internalCancel 等方法中均可见到 recentlyChangedQueue 对这些服务实例进行登记,用于记录增量式注册表信息。getApplicationDeltasFromMultipleRegions 方法同样提供了从远程 Region 的 Eureka Server 获取增量式注册表信息的能力。
五、最后
上面对 Eureka Client 及 Eureka Server 的主要功能进行分析(读书笔记),篇幅应该说是非常的长了,所以本篇内容还是需要长时间的研读并结合源码去理解。
对于书中的后续内容“Eureka 进阶”就不打算放出来了,太耗费时间去整理了,如果大家感兴趣可以根据末尾的参考资料去阅读,这就都不多说明了哈!
好了,今天的内容到这里就结束了,关注我,我们下期见
参考资料
资料一:《SpringCloud微服务架构进阶》
资料二:https://www.jianshu.com/p/be18c452be2b
扩展阅读
- 由于博主才疏学浅,难免会有纰漏,假如你发现了错误或偏见的地方,还望留言给我指出来,我会对其加以修正。
- 如果你觉得文章还不错,你的转发、分享、点赞、留言就是对我最大的鼓励。
- 感谢您的阅读,十分欢迎并感谢您的关注。
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
CSDN:J3 - 白起
这是一个技术一般,但热衷于分享;经验尚浅,但脸皮够厚;明明年轻有颜值,但非要靠才华吃饭的程序员。
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|