一.背景

最近在做一个项目,需要接入2个ES集群,于是我初始化了2个RestHighLevelClient实例esClient和esClient1

package com.xxx.common.config;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class EsConfig {

    @Value("${es.url}")
    private String url;

    @Value("${es.port}")
    private Integer port;

    @Value("${es.username}")
    private String username;

    @Value("${es.password}")
    private String password;

    @Value("${es.connection.timeout:30000}")
    private int connctionTimeout;

    @Value("${es.socket.timeout:60000}")
    private int socketTimeout;

    @Bean
    public RestHighLevelClient esClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));

        RestClientBuilder builder = RestClient.builder(new HttpHost(url, port))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout)
                        .setSocketTimeout(socketTimeout));
        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }
}

结果启动的时候报错了,如下:

java.net.ConnectException: Connection refused
 at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:823)
 at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
 at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
 at org.springframework.boot.actuate.elasticsearch.ElasticsearchRestHealthIndicator.doHealthCheck(ElasticsearchRestHealthIndicator.java:60)
 at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:82)
 at org.springframework.boot.actuate.health.HealthIndicator.getHealth(HealthIndicator.java:37)
 at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:71)
 at org.springframework.boot.actuate.health.HealthEndpoint.getHealth(HealthEndpoint.java:39)
 at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:99)
 at org.springframework.boot.actuate.health.HealthEndpointSupport.getAggregateHealth(HealthEndpointSupport.java:110)
 at org.springframework.boot.actuate.health.HealthEndpointSupport.getContribution(HealthEndpointSupport.java:96)
 at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:74)
 at org.springframework.boot.actuate.health.HealthEndpointSupport.getHealth(HealthEndpointSupport.java:61)
 at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:65)
 at org.springframework.boot.actuate.health.HealthEndpoint.health(HealthEndpoint.java:55)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:282)
 at org.springframework.boot.actuate.endpoint.invoke.reflect.ReflectiveOperationInvoker.invoke(ReflectiveOperationInvoker.java:77)
 at org.springframework.boot.actuate.endpoint.annotation.AbstractDiscoveredOperation.invoke(AbstractDiscoveredOperation.java:60)
 at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:121)
 at org.springframework.boot.actuate.endpoint.jmx.EndpointMBean.invoke(EndpointMBean.java:96)
 at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
 at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
 at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
 at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
 at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
 at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
 at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
 at sun.reflect.GeneratedMethodAccessor87.invoke(Unknown Source)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
 at sun.rmi.transport.Transport$1.run(Transport.java:200)
 at sun.rmi.transport.Transport$1.run(Transport.java:197)
 at java.security.AccessController.doPrivileged(Native Method)
 at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
 at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
 at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
 at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
 at java.security.AccessController.doPrivileged(Native Method)
 at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:715)
 at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvent(DefaultConnectingIOReactor.java:174)
 at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:148)
 at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:351)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221)
 at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
 ... 1 common frames omitted

 二.分析原因

为什么只有一个client不会报错呢,如果有2个client自检报错,而且client 里面url是【http://localhost:9200】,待着这样的疑问,我看了一遍ES自检的源码

ElasticsearchRestHealthIndicator

里面主要方法是这个

public ElasticsearchRestHealthIndicator(RestClient client) {
    super("Elasticsearch health check failed");
    this.client = client;
    this.jsonParser = JsonParserFactory.getJsonParser();
}

protected void doHealthCheck(Builder builder) throws Exception {
    Response response = this.client.performRequest(new Request("GET", "/_cluster/health/"));
    StatusLine statusLine = response.getStatusLine();
    if (statusLine.getStatusCode() != 200) {
        builder.down();
        builder.withDetail("statusCode", statusLine.getStatusCode());
        builder.withDetail("reasonPhrase", statusLine.getReasonPhrase());
    } else {
        InputStream inputStream = response.getEntity().getContent();
        Throwable var5 = null;

        try {
            this.doHealthCheck(builder, StreamUtils.copyToString(inputStream, StandardCharsets.UTF_8));
        } catch (Throwable var14) {
            var5 = var14;
            throw var14;
        } finally {
            if (inputStream != null) {
                if (var5 != null) {
                    try {
                        inputStream.close();
                    } catch (Throwable var13) {
                        var5.addSuppressed(var13);
                    }
                } else {
                    inputStream.close();
                }
            }

        }

    }
}

报错原因是client配置里的url不对指向了【http://localhost:9200】,于是好奇心又让我看了一下es自动配置源码

ElasticsearchRestClientAutoConfiguration

ElasticsearchRestClientConfigurations

ElasticsearchRestClientProperties

RestClientBuilderCustomizer

在ElasticsearchRestClientConfigurations发现了猫腻,如下代码

    @Bean
    @ConditionalOnMissingBean
    RestClient elasticsearchRestClient(RestClientBuilder builder, ObjectProvider<RestHighLevelClient> restHighLevelClient) {
        RestHighLevelClient client = (RestHighLevelClient)restHighLevelClient.getIfUnique();
        return client != null ? client.getLowLevelClient() : builder.build();
    }

这段代码表示,如果当前应用存在多个RestHighLevelClient 实例,则会取RestClientBuilder,如果只有一个实例,择取默认的那个。这也是解释了一个不报错,而多个的客户端存在的话,则取到了ElasticsearchRestClientProperties默认的配置

private List<String> uris = new ArrayList(Collections.singletonList("http://localhost:9200"));
private String username;
private String password;
private Duration connectionTimeout = Duration.ofSeconds(1L);
private Duration readTimeout = Duration.ofSeconds(30L);

三.解决方法

1.如果存在多个ES去掉自检(不推荐)

management.health.elasticsearch.enabled=false

2.配置默认RestClientBuilder,让他自检默认ES

第一种方法:名字保持和默认的相同restHighLevelClient

@Bean(name = "restHighLevelClient")
public RestHighLevelClient restHighLevelClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));

        RestClientBuilder builder = RestClient.builder(new HttpHost(url, port))
                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                    @Override
                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    }
                }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout)
                        .setSocketTimeout(socketTimeout));

        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
        return restHighLevelClient;
}

第二种方法:设置@Primary,让他作为默认的

@Bean
@Primary
public RestHighLevelClient esClient() {
    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
    credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));

    RestClientBuilder builder = RestClient.builder(new HttpHost(url, port))
            .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                }
            }).setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(connctionTimeout)
                    .setSocketTimeout(socketTimeout));
    RestHighLevelClient client = new RestHighLevelClient(builder);
    return client;
}

3.重些ES自检程序(让多个ES集群都进行自检)

这块感兴趣的可以是去扩展,让2个集群都实现健康检查