一.背景
最近在做一个项目,需要接入2个ES集群,于是我初始化了2个RestHighLevelClient实例esClient和esClient1
package com.xxx.common.config; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class EsConfig { @Value("${es.url}") private String url; @Value("${es.port}") private Integer port; @Value("${es.username}") private String username; @Value("${es.password}") private String password; @Value("${es.connection.timeout:30000}") private int connctionTimeout; @Value("${es.socket.timeout:60000}") private int socketTimeout; @Bean public RestHighLevelClient esClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder(new HttpHost(url, port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout) .setSocketTimeout(socketTimeout)); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } }
结果启动的时候报错了,如下:
java.net.ConnectException: Connection refused at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) at org.springframework.boot.actuate.elasticsearch.ElasticsearchRestHealthIndicator.doHealthCheck(ElasticsearchRestHealthIndicator.java:60) at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82) at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37) at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71) at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39) at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99) at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110) at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96) at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74) at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61) at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65) at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282) at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:77) at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60) at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121) at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96) at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819) at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801) at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468) at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829) at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) at sun.rmi.transport.Transport$1.run(Transport.java:200) at sun.rmi.transport.Transport$1.run(Transport.java:197) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.Transport.serviceCall(Transport.java:196) at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688) at java.security.AccessController.doPrivileged(Native Method) at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174) at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148) at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351) at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) ... 1 common frames omitted
二.分析原因
为什么只有一个client不会报错呢,如果有2个client自检报错,而且client 里面url是【http://localhost:9200】,待着这样的疑问,我看了一遍ES自检的源码
ElasticsearchRestHealthIndicator
里面主要方法是这个
public ElasticsearchRestHealthIndicator(RestClient client) { super("Elasticsearch health check failed"); this.client = client; this.jsonParser = JsonParserFactory.getJsonParser(); } protected void doHealthCheck(Builder builder) throws Exception { Response response = this.client.performRequest(new Request("GET", "/_cluster/health/")); StatusLine statusLine = response.getStatusLine(); if (statusLine.getStatusCode() != 200) { builder.down(); builder.withDetail("statusCode", statusLine.getStatusCode()); builder.withDetail("reasonPhrase", statusLine.getReasonPhrase()); } else { InputStream inputStream = response.getEntity().getContent(); Throwable var5 = null; try { this.doHealthCheck(builder, StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8)); } catch (Throwable var14) { var5 = var14; throw var14; } finally { if (inputStream != null) { if (var5 != null) { try { inputStream.close(); } catch (Throwable var13) { var5.addSuppressed(var13); } } else { inputStream.close(); } } } } }
报错原因是client配置里的url不对指向了【http://localhost:9200】,于是好奇心又让我看了一下es自动配置源码
ElasticsearchRestClientAutoConfiguration
ElasticsearchRestClientConfigurations
ElasticsearchRestClientProperties
RestClientBuilderCustomizer
在ElasticsearchRestClientConfigurations发现了猫腻,如下代码
@Bean @ConditionalOnMissingBean RestClient elasticsearchRestClient(RestClientBuilder builder, ObjectProvider<RestHighLevelClient> restHighLevelClient) { RestHighLevelClient client = (RestHighLevelClient)restHighLevelClient.getIfUnique(); return client != null ? client.getLowLevelClient() : builder.build(); }
这段代码表示,如果当前应用存在多个RestHighLevelClient 实例,则会取RestClientBuilder,如果只有一个实例,择取默认的那个。这也是解释了一个不报错,而多个的客户端存在的话,则取到了ElasticsearchRestClientProperties默认的配置
private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200")); private String username; private String password; private Duration connectionTimeout = Duration.ofSeconds(1L); private Duration readTimeout = Duration.ofSeconds(30L);
三.解决方法
1.如果存在多个ES去掉自检(不推荐)
management.health.elasticsearch.enabled=false
2.配置默认RestClientBuilder,让他自检默认ES
第一种方法:名字保持和默认的相同restHighLevelClient
@Bean(name = "restHighLevelClient") public RestHighLevelClient restHighLevelClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder(new HttpHost(url, port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout) .setSocketTimeout(socketTimeout)); RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder); return restHighLevelClient; }
第二种方法:设置@Primary,让他作为默认的
@Bean @Primary public RestHighLevelClient esClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder(new HttpHost(url, port)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout) .setSocketTimeout(socketTimeout)); RestHighLevelClient client = new RestHighLevelClient(builder); return client; }
3.重些ES自检程序(让多个ES集群都进行自检)
这块感兴趣的可以是去扩展,让2个集群都实现健康检查