目录
EurekaBootStrap
initEurekaEnvironment
initEurekaServerContext
ApplicationInfoManager
创建EurekaClient
创建应用实例信息的注册表
初始化EurekaServerContext
EurekaBootStrap
Eureka-Server?启动入口:该类实现了ServletContextListener,在 Servlet 容器( 例如 Tomcat、Jetty )启动时,调用?#contextInitialized() ?方法。
@Override
public void contextInitialized(ServletContextEvent event) {
try {
// 初始化eureka-server配置环境
initEurekaEnvironment();
// 初始化eureka-server上下文
initEurekaServerContext();
ServletContext sc = event.getServletContext();
sc.setAttribute(EurekaServerContext.class.getName(), serverContext);
} catch (Throwable e) {
logger.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
下面依次看下这两步具体操作有哪些
initEurekaEnvironment
protected void initEurekaEnvironment() throws Exception {
logger.info("Setting the eureka configuration..");
// 获取数据中心
String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER);
if (dataCenter == null) {
logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
} else {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
// 获取环境信息
String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
logger.info("Eureka environment value eureka.environment is not set, defaulting to test");
}
}
主要是初始化环境信息,没有很多内容,主要看下初始化上下文的步骤
initEurekaServerContext
protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
// 根据server配置,创建服务解码器
logger.info("Initializing the eureka client...");
logger.info(eurekaServerConfig.getJsonCodecName());
ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
ApplicationInfoManager applicationInfoManager = null;
// Eureka-Server 内嵌 Eureka-Client,用于和 Eureka-Server 集群里其他节点通信交互
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
/**
* eureka server 的eurekaClient本身也是个DiscoveryClient
*/
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
// 创建应用实例信息的注册表
PeerAwareInstanceRegistry registry;
if (isAws(applicationInfoManager.getInfo())) {
registry = new AwsInstanceRegistry(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
awsBinder = new AwsBinderDelegate(eurekaServerConfig, eurekaClient.getEurekaClientConfig(), registry, applicationInfoManager);
awsBinder.start();
} else {
registry = new PeerAwareInstanceRegistryImpl(
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
eurekaClient
);
}
// 创建 Eureka-Server 集群节点集合
PeerEurekaNodes peerEurekaNodes = getPeerEurekaNodes(
registry,
eurekaServerConfig,
eurekaClient.getEurekaClientConfig(),
serverCodecs,
applicationInfoManager
);
// 创建Eureka-Server上下文(提供初始化、关闭、获取等方法)
serverContext = new DefaultEurekaServerContext(
eurekaServerConfig,
serverCodecs,
registry,
peerEurekaNodes,
applicationInfoManager
);
// 初始化 EurekaServerContextHolder,使用它方便获取server的上下文
EurekaServerContextHolder.initialize(serverContext);
// 初始化时,其他server注册的client实例信息,通过创建remoteRegionRegistry时,创建线程发起http请求获得(应该也和其他的机制有关)
serverContext.initialize();
logger.info("Initialized server context");
// Copy registry from neighboring eureka node 从其他 Eureka-Server 拉取注册信息
int registryCount = registry.syncUp();
registry.openForTraffic(applicationInfoManager, registryCount);
// Register all monitoring statistics. 注册监控
EurekaMonitors.registerAllStats();
}
整个初始化EurekaServer的流程和细节点较多,一点点分析:
ApplicationInfoManager
注释1??:
首先是根据EurekaInstanceConfig、InstanceInfo来创建应用管理类,这两个类作为属性构成ApplicationInfoManager,因此主要看下InstanceInfo的创建代码,通过EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()来得到实例信息:
首先是根据配置文件的心跳间隔时间(默认30秒)、续约过期时间(默认90秒)来创建租约类的builder,租约类是服务发起续约,server过期应用的依据。后续是通过配置文件的设置数据来赋值实例属性,实例ID若有配置则读取配置,否则以主机名作为实例ID。
@Override
public synchronized InstanceInfo get() {
if (instanceInfo == null) {
// Build the lease information to be passed to the server based on config 根据eureka的Client配置构建要传递给服务器的租约信息
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
if (vipAddressResolver == null) {
vipAddressResolver = new Archaius1VipAddressResolver();
}
// Builder the instance information to be registered with eureka server
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder(vipAddressResolver);
// set the appropriate id for the InstanceInfo, falling back to datacenter Id if applicable, else hostname 设置instanceId,有若配置读取配置,否则以当前hostname作为实例ID
String instanceId = config.getInstanceId();
if (instanceId == null || instanceId.isEmpty()) {
DataCenterInfo dataCenterInfo = config.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
instanceId = ((UniqueIdentifier) dataCenterInfo).getId();
} else {
instanceId = config.getHostName(false);
}
}
// 设置客户端默认地址,主机名或ID地址
String defaultAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh AWS data center info, and return up to date address
defaultAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(false);
} else {
defaultAddress = config.getHostName(false);
}
// fail safe
if (defaultAddress == null || defaultAddress.isEmpty()) {
defaultAddress = config.getIpAddress();
}
builder.setNamespace(config.getNamespace())
.setInstanceId(instanceId)
.setAppName(config.getAppname())
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress())
.setHostName(defaultAddress)
.setPort(config.getNonSecurePort())
.enablePort(PortType.UNSECURE, config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName())
.setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(), config.getStatusPageUrl())
.setASGName(config.getASGName())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl());
// Start off with the STARTING state to avoid traffic
if (!config.isInstanceEnabledOnit()) {
InstanceStatus initialStatus = InstanceStatus.STARTING;
LOG.info("Setting initial instance status as: {}", initialStatus);
builder.setStatus(initialStatus);
} else {
LOG.info("Setting initial instance status as: {}. This may be too early for the instance to advertise "
+ "itself as available. You would instead want to control this via a healthcheck handler.",
InstanceStatus.UP);
}
// Add any user-specific metadata information 添加一些特定于客户端的元数据信息
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {
builder.add(key, value);
}
}
instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
}
return instanceInfo;
}
创建EurekaClient
紧接着通过eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);应用管理类和eurekaClient的配置来创建eurekaClient,可以看到和EurekaClient一样,也是通过创建DiscoveryClient类的方式,来将当前server实例注册、续约注册中心的,从这里也可以看出来,Server实例没有主次之分,每个Server也是将自己作为一个client来注册到注册中心上的。
创建应用实例信息的注册表
后续通过PeerAwareInstanceRegistryImpl进行注册表的创建
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
super(serverConfig, clientConfig, serverCodecs);
this.eurekaClient = eurekaClient;
this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
// We first check if the instance is STARTING or DOWN, then we check explicit overrides,
// then we check the status of a potentially existing lease.
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
通过调用父类的实现来初始化属性,然后创建实例状态覆盖规则,初始化时,赋予了三个覆盖规则,当都不满足时,返回默认覆盖规则执行结果。
覆盖状态的作用:
调用 Eureka-Server HTTP Restful 接口?apps/${APP_NAME}/${INSTANCE_ID}/status ?对应用实例覆盖状态的变更,从而达到主动的、强制的变更应用实例状态。注意,实际不会真的修改 Eureka-Client 应用实例的状态,而是修改在 Eureka-Server 注册的应用实例的状态。
通过这样的方式,Eureka-Client 在获取到注册信息时,并且配置?eureka.shouldFilterOnlyUpInstances = true ,过滤掉非?InstanceStatus.UP ?的应用实例,从而避免调动该实例,以达到应用实例的暂停服务(?InstanceStatus.OUT_OF_SERVICE ?),而无需关闭应用实例。
因此,大多数情况下,调用该接口的目的,将应用实例状态在 (?InstanceStatus.UP ?) 和 (?InstanceStatus.OUT_OF_SERVICE ?) 之间切换。
一、覆盖状态规则:
?下线或启动规则(DownOrStartingRule):当当前实例状态不处于UP、OUT_OF_SERVICE时,进行当前规则的执行,直接返回当前实例的状态作为覆盖状态,
public class DownOrStartingRule implements InstanceStatusOverrideRule {
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
/**
* 若实例状态不处于运行中、暂停服务,(启动中、下线),则不适合提供服务,不匹配
*/
if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
&& (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
logger.debug("Trusting the instance status {} from replica or instance for instance {}",
instanceInfo.getStatus(), instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
return StatusOverrideResult.NO_MATCH;
}
二、覆盖状态存在规则(OverrideExistsRule):若当前实例状态状态覆盖的数据,则使用已存在的覆盖状态作为当前覆盖状态
三、续约存在规则(LeaseExistsRule):非Server请求时,匹配已存在租约的应用实例的?nstanceStatus.OUT_OF_SERVICE ?或者?InstanceInfo.InstanceStatus.UP ?状态?
四、默认规则(AlwaysMatchInstanceStatusRule):总是返回当前实例的状态,来作为覆盖状态。
super(serverConfig, clientConfig, serverCodecs)
接着看下super里做了些什么,首先是赋值属性,创建最近取消、注册队列,定时任务设置getDeltaRetentionTask()的线程以配置的时间间隔(默认30秒)来定时执行。而getDeltaRetentionTask的run()方法,是遍历最近改变的队列信息,若队列内实例更新时间超过当前时间一定的时间段(默认三分钟),则从最近改变队列中移除,当client端发起对注册信息增量获取时,recentlyChangedQueue被用来计算最近时间的增量,返回给client端。
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);
this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
/**
* 30秒执行一次清理工作
*/
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs());
}
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
// 将更新时间超过当前3分钟的数据移出队列
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
后续是创建PeerEurekaNodes(Eureka-Server 集群节点集合)、创建EurekaServerContext(Eureka-Server上下文),将上下文放入holder方便获取,紧接着到了初始化上下文的节点,重点看下这个
初始化EurekaServerContext
初始化的代码,主要是做了两件事,启动刚刚创建好的server集群节点集合、初始化应用实例信息注册表,一个个来看。
public void initialize() {
logger.info("Initializing ...");
// 启动 Eureka-Server 集群节点集合(集群复制)
peerEurekaNodes.start();
try {
// 初始化 应用实例信息的注册表
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
启动Server集群节点集合
start():首先是创建一个定时任务,紧接着更新集群节点信息,创建一个实现run方法的类,run()的代码主要起到更新集群节点信息的作用,接着将该类交由定时任务每隔一定时间执行一次。这样就保证了初始化时,获取到server的集群信息,并每间隔一段时间发起请求去更新本地注册表server实例的用途。
public void start() {
// 创建定时任务
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 初始化集群节点信息
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
// 更新集群节点信息
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
/**
* 定时任务设置执行间隔(默认10分钟)
*/
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
下面主要看下,server是如何初始化、更新集群节点信息的
resolvePeerUrls():得到当前实例信息、可见区,然后通过getDiscoveryServiceUrls()根据DNS或者配置信息解析出所有的server服务URLS,除去当前server实例信息,就是其他所有的server服务。
protected List<String> resolvePeerUrls() {
/**
* 获取当前实例信息、可见区
*/
InstanceInfo myInfo = applicationInfoManager.getInfo();
String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo);
// 获取 eureka 客户端与之对话的所有 eureka 服务 url 的列表
List<String> replicaUrls = EndpointUtils
.getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo));
/**
* 去除本身URL,剩余是需要同步的
*/
int idx = 0;
while (idx < replicaUrls.size()) {
if (isThisMyUrl(replicaUrls.get(idx))) {
replicaUrls.remove(idx);
} else {
idx++;
}
}
return replicaUrls;
}
public static List<String> getDiscoveryServiceUrls(EurekaClientConfig clientConfig, String zone, ServiceUrlRandomizer randomizer) {
/**
* 根据DNS或者配置信息解析出所有server服务urls
*/
boolean shouldUseDns = clientConfig.shouldUseDnsForFetchingServiceUrls();
if (shouldUseDns) {
return getServiceUrlsFromDNS(clientConfig, zone, clientConfig.shouldPreferSameZoneEureka(), randomizer);
}
return getServiceUrlsFromConfig(clientConfig, zone, clientConfig.shouldPreferSameZoneEureka());
}
更新集群节点集合
上面已经得到最新的server节点集合,之前的server节点除去所有新server,剩余的就是要下线的server,最新server除去所有之前server节点集合,就是本次新增的server节点集合。
若本次没有新增、下线server节点,不进行操作。若下线节点集合存在,shutdown下线的eurekaNode,关闭node的线程池(集群同步,下文讲解);若新增节点集合存在,创建node节点,并添加到临时变量newNodeList中,后续赋值为当前server集群节点集合。首次初始化时,就是通过解析出的server全为新增节点,来完成peerEurekaNodes集合的属性填充。
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
/**
* 计算出需要下线、新增的服务列表
*/
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
/**
* 无需要下线、新增的,直接返回
*/
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
/**
* 下线删除
*/
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
/**
* 新增添加到peerEurekaNodes
*/
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
初始化 应用实例信息的注册表
根据刚刚创建好的集群节点集合去初始化注册信息:每一步的作用下面已有注释,下面分析下主要的代码
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 开始测速类定时任务,每分钟清空,实现测速
this.numberOfReplicationsLastMin.start();
// 属性赋值
this.peerEurekaNodes = peerEurekaNodes;
// 初始化响应缓存
initializedResponseCache();
// 通过定时任务,间隔一定时间,更新续约阈值
scheduleRenewalThresholdUpdateTask();
// 初始化远程的server注册信息
initRemoteRegionRegistry();
try {
// 监控注册
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}
初始化响应缓存
1、通过CacheBuilder创建默认1000大小,180秒过期的缓存数据
2、若当前实例允许使用readonly的缓存,那么开启定时任务,每隔30秒更新readOnlyCacheMap的数据,实现是遍历readOnlyCacheMap若value值和readWriteCacheMap的value值不同,则将readWriteCacheMap中最新的value值赋值到readOnly缓存中去,这样就保证了readOnly的数据不会存在太久的脏数据。
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
// guava缓存 默认容量1000,180秒过期
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
if (shouldUseReadOnlyResponseCache) {
// 初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs,设置任务执行频率,默认值 :30 * 1000 毫秒
// 负责更新readwriterMap中的数据
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
readOnly数据的填充,是当其他服务发起全量、增量、应用信息获取时,通过响应缓存读取数据,若readOnlyMap存在缓存则,直接返回,否则读取readWriterCacheMap,并将返回值赋值到readOnlyMap中,避免对readWriterCacheMap的频繁调用。
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
// 先读取 readOnlyCacheMap 。读取不到,读取 readWriteCacheMap ,并设置到 readOnlyCacheMap
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
// 过期后,重新设置(默认180秒过期一次)
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
// 读取 readWriteCacheMap
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
那么为什么要这样操作,避免对readWriterCacheMap频繁调用呢,readWriterCacheMap已经是记录三分钟、1000容量的缓存,当未命中到缓存时,需要通过generatePayload()方法,来获取到具体数据,具体代码见下面代码:
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});
// 服务有效负载
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
// 全量获取
tracer = serializeAllAppsTimer.start();
// 根据注册的实例集合和key构造需要缓存的实例数据
payload = getPayLoad(key, registry.getApplications());
}
} else if (ALL_APPS_DELTA.equals(key.getName())) {
// 增量(待完成)
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
根据不同的请求类型,来完成相应操作,类型较多,只以全量为例进行解析,其余实现类似:getPayLoad():通过server解码器,将数据解码为String,返回给client,忽略。getApplicationsFromMultipleRegions():增加监控数据,可忽略,后续读取当前server的注册表信息,将租约实例信息添加到临时变量apps中,此时当前server的所有实例信息已经得到,但是当前server可能因为网络原因,本地的注册表并不一定会是最全最新的注册信息,因此将本地缓存的其他server的注册表信息(server集群同步时缓存,下文讲解)也添加到apps中,循环远程注册表缓存数据,apps获取该app名称,若不存在,创建app信息,并添加所有实例,因是map、set数据结构来接收实例信息,所以不会存在多个实例重复注册后,数据重复的情况。得到所有apps后,所有实例状态及状态对应数据放入map中,通过hashmap的自然排序,生成应用集合的hashcode,hashcode例子:DOWN_2_UP_8_。
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
// 增加对应指令的监控数据
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
// 获取当前server的所有应用实例集合
Applications apps = new Applications();
apps.setVersion(1L);
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
app = new Application(lease.getHolder().getAppName());
}
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
apps.addApplication(app);
}
}
// 将其他活跃的server注册数据,也依次添加进集合中
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
// 猜测:应该是集群同步时,有其他集群的实例信息缓存?
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
// 设置 应用集合hashcode,可以用来后续匹配校验是否变更过(该变量用于校验增量获取的注册信息和 Eureka-Server 全量的注册信息是否一致( 完整 ))
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
更新续约阈值:
跳出来,接着说初始化注册表,接下来会通过定时任务,更新续约阈值。
通过定时任务,每隔十五分钟执行一次,进行重置当前server的自我保护阈值。首先是获取当前server的所有实例数值,当数据大于 预期客户端数*0.85,或者未开启自我自我保护机制时,更新预期客户端数值,变更每分钟续约最小阈值。当开启保护机制,若运行活跃实例数据小于预期客户端*0.85时,不进行操作,此处也是server自我保护机制的实现,server任务该时间段内过多服务下线,server自动进行自我保护机制。不修改预期客户端数值、每分钟续约阈值,那么当驱逐实例定时任务运行时,存在其他服务续约过期,server判断实例数过小时,保护当前注册表信息,不会进行驱逐操作(下面分析)。
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
try {
// 计算 应用实例数
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
// 若count>上次实例数*0.85,(server没进入自我保护机制)或者未开启自动保护时,更新实例数
// 进入自我保护机制后,会保护目前注册表的实例
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)
// 未开启 自我保护机制配置
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}
未完明日更新...
|