EurekaHttpClientDecorator是一个抽象类,存在抽象方法:
protected abstract <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor);
每次调用execute 都是通过 eurekaTransport.queryClient 实现。其中 eurekaTransport.queryClient 为 SessionedEurekaHttpClient。
继承自EurekaHttpClientDecorator的类有如下几个,都在com.netflix.discovery.shared.transport.decorator包里头
- MetricsCollectingEurekaHttpClient。
- RedirectingEurekaHttpClient。
- RetryableEurekaHttpClient。
- SessionedEurekaHttpClient。
EurekaClientAutoConfiguration
入口
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config,
EurekaInstanceConfig instance) {
manager.getInfo();
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
DiscoveryClient
...
scheduleServerEndpointTask(eurekaTransport, args);
...
...
if (clientConfig.shouldRegisterWithEureka()) {
EurekaHttpClientFactory newRegistrationClientFactory = null;
EurekaHttpClient newRegistrationClient = null;
try {
newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
transportConfig
);
newRegistrationClient = newRegistrationClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
eurekaTransport.registrationClient = newRegistrationClient;
}
....
EurekaHttpClients
static EurekaHttpClientFactory canonicalClientFactory(final String name,
final EurekaTransportConfig transportConfig,
final ClusterResolver<EurekaEndpoint> clusterResolver,
final TransportClientFactory transportClientFactory) {
return new EurekaHttpClientFactory() {
@Override
public EurekaHttpClient newClient() {
return new SessionedEurekaHttpClient(
name,
RetryableEurekaHttpClient.createFactory(
name,
transportConfig,
clusterResolver,
RedirectingEurekaHttpClient.createFactory(transportClientFactory),
ServerStatusEvaluators.legacyEvaluator()),
transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
);
}
@Override
public void shutdown() {
wrapClosable(clusterResolver).shutdown();
}
};
}
SessionedEurekaHttpClient
public SessionedEurekaHttpClient(String name, EurekaHttpClientFactory clientFactory, long sessionDurationMs) {
this.name = name;
this.clientFactory = clientFactory;
this.sessionDurationMs = sessionDurationMs;
this.currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
Monitors.registerObject(name, this);
}
RetryableEurekaHttpClient
public static EurekaHttpClientFactory createFactory(final String name,
final EurekaTransportConfig transportConfig,
final ClusterResolver<EurekaEndpoint> clusterResolver,
final TransportClientFactory delegateFactory,
final ServerStatusEvaluator serverStatusEvaluator) {
return new EurekaHttpClientFactory() {
@Override
public EurekaHttpClient newClient() {
return new RetryableEurekaHttpClient(name, transportConfig, clusterResolver, delegateFactory,
serverStatusEvaluator, DEFAULT_NUMBER_OF_RETRIES);
}
@Override
public void shutdown() {
delegateFactory.shutdown();
}
};
}
RedirectingEurekaHttpClient
public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) {
final DnsServiceImpl dnsService = new DnsServiceImpl();
return new TransportClientFactory() {
@Override
public EurekaHttpClient newClient(EurekaEndpoint endpoint) {
return new RedirectingEurekaHttpClient(endpoint.getServiceUrl(), delegateFactory, dnsService);
}
@Override
public void shutdown() {
delegateFactory.shutdown();
}
};
}
调用方式
eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
@Override
public EurekaHttpResponse<InstanceInfo> sendHeartBeat(final String appName,
final String id,
final InstanceInfo info,
final InstanceStatus overriddenStatus) {
return execute(new RequestExecutor<InstanceInfo>() {
@Override
public EurekaHttpResponse<InstanceInfo> execute(EurekaHttpClient delegate) {
return delegate.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public RequestType getRequestType() {
return RequestType.SendHeartBeat;
}
});
}
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
return requestExecutor.execute(eurekaHttpClient);
}
RetryableEurekaHttpClient#execute
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
...
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
} catch (Exception e) {
}
...
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
RedirectingEurekaHttpClient#execute
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClient currentEurekaClient = delegateRef.get();
if (currentEurekaClient == null) {
AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
try {
EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
return response;
} catch (Exception e) {
logger.error("Request execution error", e);
TransportUtils.shutdown(currentEurekaClientRef.get());
throw e;
}
} else {
try {
return requestExecutor.execute(currentEurekaClient);
} catch (Exception e) {
logger.error("Request execution error", e);
delegateRef.compareAndSet(currentEurekaClient, null);
currentEurekaClient.shutdown();
throw e;
}
}
}
MetricsCollectingEurekaHttpClient#execute
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
Stopwatch stopwatch = requestMetrics.latencyTimer.start();
try {
EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
return httpResponse;
} catch (Exception e) {
requestMetrics.connectionErrors.increment();
exceptionsMetric.count(e);
throw e;
} finally {
stopwatch.stop();
}
}
|