前言
SpringCloud作为Spring家族的衍生平,扮演着微服务框架的重要角色。本篇主要涉及到SpringCloud的源代码中@LoadBalance注解是如何生效的。使用的开发环境中,注册中心使用的是nacos,负载均衡使用的是spring-cloud-loadbalancer,而不是ribbon,所以会涉及到一些关于nacos是怎么样为LoadBalancer提供服务支持的,但是主要还是分析loadbalance注解的工作流程。
SpringCloud的依赖包
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
<version>2021.0.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.6.3</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
依赖spring-cloud-starter-loadbalancer时,会引入spring-cloud-common这个jar包。
流程图
LoadBalance的前期准备
LoadBalancerAutoConfiguration类
spring-cloud-common这个jar包中有一个LoadBalancerAutoConfiguration类,这个类因为添加了@Configuration的注解,所以会被当做configuration的Bean对象进行初始化。但是细心的读者可以看到,这个Configuration还有一个@ConditionalOnBean(LoadBalancerClient.class)注解,这个注解会去找java程序是否有LoadBalancerClient接口的实现类。所以当我们在pom.xml文件引入spring-cloud-starter-loadbalancer类库或者ribbon的类库时,两者都是实现了LoadBalancerClient接口的,所以这个LoadBalancerAutoConfiguration类才会生效。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerClientsProperties.class)
public class LoadBalancerAutoConfiguration {
}
LoadBalancerClient接口和BlockingLoadBalancerClient类
接下来就到了BlockingLoadBalancerClient类实现LoadBalancerClient接口的研究了。LoadBalancerClient接口定义了三个方法,父接口ServiceInstanceChooser定义了两个方法,选择注册中心的服务,使用该服务处理request,返回response
public interface ServiceInstanceChooser {
ServiceInstance choose(String serviceId);
<T> ServiceInstance choose(String serviceId, Request<T> request);
}
public interface LoadBalancerClient extends ServiceInstanceChooser {
<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
URI reconstructURI(ServiceInstance instance, URI original);
}
下面是BlockingLoadBalancerClient 类的代码实现,省略了部分代码。choose方法是通过serviceId,拿到对应的微服务实例ServiceInstance,具体怎么拿到这个实例的,后面在讲。然后,execute方法使用服务实例request,输出response
public class BlockingLoadBalancerClient implements LoadBalancerClient {
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory;
public BlockingLoadBalancerClient(ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerClientFactory) {
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@Override
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
...
ServiceInstance serviceInstance = choose(serviceId, lbRequest);
...
return execute(serviceId, serviceInstance, lbRequest);
}
@Override
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request)
throws IOException {
...
T response = request.apply(serviceInstance);
...
}
@Override
public <T> ServiceInstance choose(String serviceId, Request<T> request) {
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerClientFactory.getInstance(serviceId);
...
Response<ServiceInstance> loadBalancerResponse = Mono.from(loadBalancer.choose(request)).block();
...
return loadBalancerResponse.getServer();
}
}
LoadBalance获取ServiceInstance
上面说到了Choose方法里,通过serviceId可以拿到注册中心里的服务实例。Choose方法会调用 ReactiveLoadBalancer.Factory接口的getInstance(serviceId)方法,来获取注册中心的服务实例。 LoadBalancerClientFactory类实现了接口ReactiveLoadBalancer.Factory,它的getInstance方法是调用父类NamedContextFactory的getInstance方法,获取ReactorServiceInstanceLoadBalancer接口的实现Bean
public class LoadBalancerClientFactory extends NamedContextFactory<LoadBalancerClientSpecification>
implements ReactiveLoadBalancer.Factory<ServiceInstance> {
@Override
public ReactiveLoadBalancer<ServiceInstance> getInstance(String serviceId) {
return getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
}
}
可以看到RoundRobinLoadBalancer类 实现了ReactorServiceInstanceLoadBalancer 接口
@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
public class LoadBalancerClientConfiguration {
private static final int REACTIVE_SERVICE_INSTANCE_SUPPLIER_ORDER = 193827465;
@Bean
@ConditionalOnMissingBean
public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment,
LoadBalancerClientFactory loadBalancerClientFactory) {
String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
return new RoundRobinLoadBalancer(
loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
}
}
RoundRobinLoadBalancer
RoundRobinLoadBalancer.choose方法
上面的BlockingLoadBalancerClient.choose方法,会调用RoundRobinLoadBalancer类的choose方法
public class RoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
@SuppressWarnings("rawtypes")
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next()
.map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
}
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
}
ServiceInstanceListSupplier 和 ServiceInstanceListSupplierBuilder
上面的RoundRobinLoadBalancer类的choose方法会先拿到ServiceInstanceListSupplier的实例,再用它调用processInstanceResponse方法。ServiceInstanceListSupplier的bean对象实例化有些复杂,注册的地方如下。
@Bean
@ConditionalOnBean(DiscoveryClient.class)
@ConditionalOnMissingBean
@Conditional(DefaultConfigurationCondition.class)
public ServiceInstanceListSupplier discoveryClientServiceInstanceListSupplier(
ConfigurableApplicationContext context) {
return ServiceInstanceListSupplier.builder().withBlockingDiscoveryClient().withCaching().build(context);
}
ServiceInstanceListSupplier先使用了builder模式,创建出ServiceInstanceListSupplierBuilder实例,然后定义了两个ServiceInstanceListSupplier,一个是用于缓存的CachingServiceInstanceListSupplier和一个是用于注册中心的DiscoveryClientServiceInstanceListSupplier。由于它们是lambda表达式,当build的时候,它们会使用Spring上下文ConfigurableApplicationContext,获取Spring上下文中的Bean实例。注意这里的CachingServiceInstanceListSupplier有一个next是指向DiscoveryClientServiceInstanceListSupplier的,当Caching拿不到结果,再从DiscoveryClient拿数据。
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {
public ServiceInstanceListSupplierBuilder withBlockingDiscoveryClient() {
if (baseCreator != null && LOG.isWarnEnabled()) {
LOG.warn("Overriding a previously set baseCreator with a blocking DiscoveryClient baseCreator.");
}
this.baseCreator = context -> {
DiscoveryClient discoveryClient = context.getBean(DiscoveryClient.class);
return new DiscoveryClientServiceInstanceListSupplier(discoveryClient, context.getEnvironment());
};
return this;
}
public ServiceInstanceListSupplierBuilder withCaching() {
if (cachingCreator != null && LOG.isWarnEnabled()) {
LOG.warn(
"Overriding a previously set cachingCreator with a CachingServiceInstanceListSupplier-based cachingCreator.");
}
this.cachingCreator = (context, delegate) -> {
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
if (cacheManagerProvider.getIfAvailable() != null) {
return new CachingServiceInstanceListSupplier(delegate, cacheManagerProvider.getIfAvailable());
}
if (LOG.isWarnEnabled()) {
LOG.warn("LoadBalancerCacheManager not available, returning delegate without caching.");
}
return delegate;
};
return this;
}
public ServiceInstanceListSupplier build(ConfigurableApplicationContext context) {
Assert.notNull(baseCreator, "A baseCreator must not be null");
ServiceInstanceListSupplier supplier = baseCreator.apply(context);
for (DelegateCreator creator : creators) {
supplier = creator.apply(context, supplier);
}
if (this.cachingCreator != null) {
supplier = this.cachingCreator.apply(context, supplier);
}
return supplier;
}
}
CompositeDiscoveryClient和NacosDiscoveryClient
CompositeDiscoveryClient的Bean声明,声明的地方会拿到所有的DiscoveryClient的Bean实例集合
@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore(SimpleDiscoveryClientAutoConfiguration.class)
public class CompositeDiscoveryClientAutoConfiguration {
@Bean
@Primary
public CompositeDiscoveryClient compositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
return new CompositeDiscoveryClient(discoveryClients);
}
}
CompositeDiscoveryClient调用getInstances方法,会去调用List集合中每个实例的getInstances方法,如果拿到结果就返回。
public class CompositeDiscoveryClient implements DiscoveryClient {
private final List<DiscoveryClient> discoveryClients;
public CompositeDiscoveryClient(List<DiscoveryClient> discoveryClients) {
AnnotationAwareOrderComparator.sort(discoveryClients);
this.discoveryClients = discoveryClients;
}
@Override
public List<ServiceInstance> getInstances(String serviceId) {
if (this.discoveryClients != null) {
for (DiscoveryClient discoveryClient : this.discoveryClients) {
List<ServiceInstance> instances = discoveryClient.getInstances(serviceId);
if (instances != null && !instances.isEmpty()) {
return instances;
}
}
}
return Collections.emptyList();
}
}
NacosDiscoveryClient 实现了DiscoveryClient 接口,所以会出现在上面的DiscoveryClient集合,就会触发getInstances方法。之后就是nacos客户端根据yaml文件中的配置去请求nacos服务端的api,拿到ServiceId对应的ServiceInstance集合了。因为篇幅原因就不讨论nacos的源代码了。
public class NacosDiscoveryClient implements DiscoveryClient {
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
return Optional.of(serviceDiscovery.getInstances(serviceId)).map(instances -> {
ServiceCache.setInstances(serviceId, instances);
return instances;
}).get();
}
catch (Exception e) {
if (failureToleranceEnabled) {
return ServiceCache.getInstances(serviceId);
}
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
}
RoundRobinLoadBalancer.processInstanceResponse方法
上面RoundRobinLoadBalancer.choose方法拿到ServiceInstance集合后,会调用processInstanceResponse方法,position记录了调用次数,通过求模,达到轮询服务列表的目的。
private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
if (instances.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("No servers available for service: " + serviceId);
}
return new EmptyResponse();
}
int pos = Math.abs(this.position.incrementAndGet());
ServiceInstance instance = instances.get(pos % instances.size());
return new DefaultResponse(instance);
}
RestTemplate
我们平时开发微服务时,注册RestTemplate会加上@LoadBalanced,那是因为自动装配是,RestTemplate的集合上也加了@LoadBalanced,而且@LoadBalanced是集成@Qualifier的,因此Spring在getBean的时候,会找加@LoadBalanced的RestTemplate的Bean。
public class LoadBalancerAutoConfiguration {
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
}
RestTemplate 在调用execute去访问注册中的微服务时,会使用父类InterceptingHttpAccessor的getRequestFactory方法,拿到一个有interceptors拦截器集合的InterceptingClientHttpRequestFactory
public abstract class InterceptingHttpAccessor extends HttpAccessor {
@Override
public ClientHttpRequestFactory getRequestFactory() {
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}
}
拦截器LoadBalancerInterceptor
下面的源代码来自LoadBalancerAutoConfiguration.java文件,可以看到loadBalancedRestTemplateInitializerDeprecated方法在Bean实例化时,去获取restTemplateCustomizers集合,遍历集合customizers,嵌套遍历restTemplates,调用RestTemplateCustomizer这个lamda的customize方法。于是LoadBalancerInterceptor实例就被加入到restTemplate的Interceptors里面了。这里的Interceptor在实例化时,是初始化了LoadBalancerRequestFactory 和BlockingLoadBalancerClient的
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();
@Bean
public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
return () -> restTemplateCustomizers.ifAvailable(customizers -> {
for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
for (RestTemplateCustomizer customizer : customizers) {
customizer.customize(restTemplate);
}
}
});
}
@Bean
@ConditionalOnMissingBean
public LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {
return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
}
@Bean
public LoadBalancerInterceptor loadBalancerInterceptor(LoadBalancerClient loadBalancerClient,
LoadBalancerRequestFactory requestFactory) {
return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
}
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}
RestTemplate发起http请求
当调用RestTemplate的http请求方法是,都会进入doExecute方法。于是就会通过createRequest方法创建一个ClientHttpRequest
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations {
@Nullable
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
...
ClientHttpRequest request = createRequest(url, method);
...
response = request.execute();
...
}
}
InterceptingClientHttpRequestFactory是上面的bean的一系列实例化生成的,它的createRequest方法实例化了一个InterceptingClientHttpRequest
public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {
@Override
protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
}
}
InterceptingClientHttpRequest 作为ClientHttpRequest执行executeInternal方法时,会遍历interceptors,调用intercept方法。于是就会进入LoadBalancerInterceptor的intercept方法
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {
private final List<ClientHttpRequestInterceptor> interceptors;
@Override
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
return requestExecution.execute(this, bufferedOutput);
}
private class InterceptingRequestExecution implements ClientHttpRequestExecution {
private final Iterator<ClientHttpRequestInterceptor> iterator;
@Override
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this);
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
}
LoadBalancerInterceptor.intercept调用上面讲的BlockingLoadBalancerClient.execute方法
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
@Override
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI();
String serviceName = originalUri.getHost();
Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));
}
}
BlockingLoadBalancerClient.execute方法,会触发LoadBalancerRequestFactory 生成的lamda的HttpRequest,传入instance。这里的instance包含了注册中心微服务实例的真实url地址。拿到真实地址就进入上面的InterceptingRequestExecution.execute的else部分。后面就不再赘述了。
public class LoadBalancerRequestFactory {
public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest, instance);
}
}
return execution.execute(serviceRequest, body);
};
}
}
|