一.背景
最近在做一个项目,需要接入2个ES集群,于是我初始化了2个RestHighLevelClient实例esClient和esClient1
package com.xxx.common.config;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EsConfig {
@Value("${es.url}")
private String url;
@Value("${es.port}")
private Integer port;
@Value("${es.username}")
private String username;
@Value("${es.password}")
private String password;
@Value("${es.connection.timeout:30000}")
private int connctionTimeout;
@Value("${es.socket.timeout:60000}")
private int socketTimeout;
@Bean
public RestHighLevelClient esClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(new HttpHost(url, port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout)
.setSocketTimeout(socketTimeout));
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
}
结果启动的时候报错了,如下:
java.net.ConnectException: Connection refused
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
at org.springframework.boot.actuate.elasticsearch.ElasticsearchRestHealthIndicator.doHealthCheck(ElasticsearchRestHealthIndicator.java:60)
at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71)
at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74)
at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61)
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65)
at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:77)
at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121)
at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
at sun.rmi.transport.Transport$1.run(Transport.java:200)
at sun.rmi.transport.Transport$1.run(Transport.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
... 1 common frames omitted
?二.分析原因
为什么只有一个client不会报错呢,如果有2个client自检报错,而且client 里面url是【http://localhost:9200】,待着这样的疑问,我看了一遍ES自检的源码
ElasticsearchRestHealthIndicator
里面主要方法是这个
public ElasticsearchRestHealthIndicator(RestClient client) {
super("Elasticsearch health check failed");
this.client = client;
this.jsonParser = JsonParserFactory.getJsonParser();
}
protected void doHealthCheck(Builder builder) throws Exception {
Response response = this.client.performRequest(new Request("GET", "/_cluster/health/"));
StatusLine statusLine = response.getStatusLine();
if (statusLine.getStatusCode() != 200) {
builder.down();
builder.withDetail("statusCode", statusLine.getStatusCode());
builder.withDetail("reasonPhrase", statusLine.getReasonPhrase());
} else {
InputStream inputStream = response.getEntity().getContent();
Throwable var5 = null;
try {
this.doHealthCheck(builder, StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8));
} catch (Throwable var14) {
var5 = var14;
throw var14;
} finally {
if (inputStream != null) {
if (var5 != null) {
try {
inputStream.close();
} catch (Throwable var13) {
var5.addSuppressed(var13);
}
} else {
inputStream.close();
}
}
}
}
}
报错原因是client配置里的url不对指向了【http://localhost:9200】,于是好奇心又让我看了一下es自动配置源码
ElasticsearchRestClientAutoConfiguration
ElasticsearchRestClientConfigurations
ElasticsearchRestClientProperties
RestClientBuilderCustomizer
在ElasticsearchRestClientConfigurations发现了猫腻,如下代码
@Bean
@ConditionalOnMissingBean
RestClient elasticsearchRestClient(RestClientBuilder builder, ObjectProvider<RestHighLevelClient> restHighLevelClient) {
RestHighLevelClient client = (RestHighLevelClient)restHighLevelClient.getIfUnique();
return client != null ? client.getLowLevelClient() : builder.build();
}
这段代码表示,如果当前应用存在多个RestHighLevelClient 实例,则会取RestClientBuilder,如果只有一个实例,择取默认的那个。这也是解释了一个不报错,而多个的客户端存在的话,则取到了ElasticsearchRestClientProperties默认的配置
private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200"));
private String username;
private String password;
private Duration connectionTimeout = Duration.ofSeconds(1L);
private Duration readTimeout = Duration.ofSeconds(30L);
三.解决方法
1.如果存在多个ES去掉自检(不推荐)
management.health.elasticsearch.enabled=false
2.配置默认RestClientBuilder,让他自检默认ES
第一种方法:名字保持和默认的相同restHighLevelClient
@Bean(name = "restHighLevelClient")
public RestHighLevelClient restHighLevelClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(new HttpHost(url, port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout)
.setSocketTimeout(socketTimeout));
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
return restHighLevelClient;
}
第二种方法:设置@Primary,让他作为默认的
@Bean
@Primary
public RestHighLevelClient esClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(new HttpHost(url, port))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout)
.setSocketTimeout(socketTimeout));
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
3.重些ES自检程序(让多个ES集群都进行自检)
这块感兴趣的可以是去扩展,让2个集群都实现健康检查
|