Spring Cloud Eureka(二):Client源码
入口
在上一篇搭建的 Spring Cloud Client 客户端项目中,只是在配置文件中加了一点配置,Eureka 客户端就成功启动了,那么客户端是如何加入 Spring 的呢? 我们知道如果想注册一些Bean到Spring中,但是这些Bean又不在Spring的扫描路径下,那就只剩下两种方案:
- 使用 @Configuration 或 @Import
- 使用
SpringBoot SPI 机制,配置到spring.factories 文件中
找到 spring-cloud-netflix-eureka-client.jar 中 META-INF/spring.factories 文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.DiscoveryClientOptionalArgsConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaConfigServerBootstrapConfiguration
这些配置类注册了一系列 Eureka 需要的 bean,这也是我们寻找一些重要的类的入手处。
EurekaDiscoveryClient
客户端最重要的就是从服务端发现服务,Spring Cloud 定义了一个用于服务发现的顶级接口org.springframework.cloud.client.discovery.DiscoveryClient
public interface DiscoveryClient extends Ordered {
String description();
List<ServiceInstance> getInstances(String serviceId);
List<String> getServices();
}
Eureka 也有对应的实现类,在spring.factories 文件中的配置类EurekaDiscoveryClientConfiguration 里注册
@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
public class EurekaDiscoveryClientConfiguration {
@Bean
@ConditionalOnMissingBean
public EurekaDiscoveryClient discoveryClient(EurekaClient client,
EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}
}
注册该类需要另外两个 bean EurekaClient 和EurekaClientConfig
EurekaClientConfig
@ImplementedBy(DefaultEurekaClientConfig.class)
public interface EurekaClientConfig {...}
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig, Ordered {
public static final String PREFIX = "eureka.client";
}
该类在spring.factories 文件中EurekaClientAutoConfiguration 里注册
public class EurekaClientAutoConfiguration {
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class,
search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
return new EurekaClientConfigBean();
}
}
EurekaClient
EurekaClient最底层的实现类是CloudEurekaClient ,该类只是在方法onCacheRefreshed被调用时发送一个心跳事件,主要的工作还是com.netflix.discovery.DiscoveryClient 完成。
public class DiscoveryClient implements EurekaClient {
boolean register();
boolean renew();
synchronized void shutdown();
Application getApplication(String appName);
Applications getApplications();
List<InstanceInfo> getInstancesById(String id);
}
下面重点来看DiscoveryClient
DiscoveryClient
先看构造函数
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
return;
}
try {
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
);
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
);
}
if (clientConfig.shouldFetchRegistry()) {
try {
boolean primaryFetchRegistryResult = fetchRegistry(false);
if (!primaryFetchRegistryResult) {
logger.info("Initial registry fetch from primary servers failed");
}
boolean backupFetchRegistryResult = true;
if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {
backupFetchRegistryResult = false;
logger.info("Initial registry fetch from backup servers failed");
}
if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {
throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");
}
} catch (Throwable th) {
logger.error("Fetch registry error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
initScheduledTasks();
}
构造函数小结:
- 初始化部分信息
- 拉取注册表
- 注册自己
- 初始化定时器:服务心跳,服务拉取,服务实例信息发送
fentchRegistry:拉取注册表
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1))
{
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
onCacheRefreshed();
updateInstanceRemoteStatus();
return true;
}
@Override
public Applications getApplications() {
return localRegionApps.get();
}
getAndStoreFullRegistry:全量注册表
private final AtomicLong fetchRegistryGeneration;
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
AbstractJerseyEurekaHttpClient
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
return getApplicationsInternal("apps/", regions);
}
@Override
public EurekaHttpResponse<Applications> getVip(String vipAddress, String... regions) {
return getApplicationsInternal("vips/" + vipAddress, regions);
}
@Override
public EurekaHttpResponse<Applications> getDelta(String... regions) {
return getApplicationsInternal("apps/delta", regions);
}
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
}
}
getAndUpdateDelta:增量注册表
private final Lock fetchRegistryUpdateLock = new ReentrantLock();
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode);
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
private void updateDelta(Applications delta) {
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
Applications applications = getApplications();
if (ActionType.ADDED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}
else if (ActionType.MODIFIED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
}
else if (ActionType.DELETED.equals(instance.getActionType())) {
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
existingApp.removeInstance(instance);
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
}
register:注册自己
boolean register() throws Throwable {
eurekaTransport.registrationClient.register(instanceInfo);
}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
}
}
initScheduledTasks:初始化定时器
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2);
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (statusChangeEvent.getStatus() == InstanceStatus.DOWN) {
logger.error("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
CacheRefreshThread:拉取注册表线程
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
@VisibleForTesting
void refreshRegistry() {
try {
boolean success = fetchRegistry(remoteRegionsModified);
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
HeartbeatThread:心跳线程
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {
String urlPath = "apps/" + appName + '/' + id;
}
InstanceInfoReplicator:服务实例信息刷新线程
class InstanceInfoReplicator implements Runnable {
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
}
定时器小结
- 拉取注册表,频率由
eureka.client.registry-fetch-interval-seconds 决定,默认30s - 心跳续约,频率由
eureka.instance.lease-renewal-interval-in-seconds 决定,默认30s - 服务实例信息刷新,频率由
eureka.client.instance-info-replication-interval-seconds 决定,默认30s - 状态监听器,监听状态变化的事件,触发服务实例信息刷新执行,属于服务实例信息刷新的被动表现,是否启用由
eureka.client.on-demand-update-status-change 决定,默认true
shutdown:服务下线
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
logger.info("Completed shut down of DiscoveryClient");
}
}
void unregister() {
eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
}
public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
@Override
public EurekaHttpResponse<Void> cancel(String appName, String id) {
String urlPath = "apps/" + appName + '/' + id;
}
}
总结
这一篇从DiscoveryClient 的构造函数开始,浏览了一遍 Eureka 客户端的工作流程,包括拉取注册表,注册自身,心跳续约等任务。也看到了一些配置项生效的源码,以及和服务端通信的 HTTP 接口路径。看完了这些源码,对 Eureka 客户端的实现原理也有了一定的认识。如果有兴趣,完全可以自己去实现一个其他语言的客户端。
|