前情提要

使用JETCD-Java客户端无法直接使用cfssl生成的.pem授权信息(只对于私钥信息,公钥所需格式都是一致的).所需的KeyFile文件必须是pkcs#8格式的.key文件才能够被netty读取到(默认生成的是**-key.pem的私钥信息,其文件格式是pkcs#1的格式)

使用如下命令进行转换

netty所需私钥需要将pkcs#1的.pem私钥转换为pkcs#8的.key格式的私钥.

openssl pkcs8 -topk8 -nocrypt -in client-key.pem -out client.key

JETCD设置SSL(准备好CA证书、客户端证书、私钥)

根据客户端 ClientBuilder的方法可以知道,我们需要为客户端Client创建设置SslContext来启动SSL

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.netty.handler.ssl.*;

public SslContext openSslContext() throws SSLException, FileNotFoundException {
    // 证书、客户端证书、客户端私钥
    File trustManagerFile = ResourceUtils.getFile("classpath:ca/ca.pem");
    File keyCertChainFile = ResourceUtils.getFile("classpath:ca/reader.pem");
    File KeyFile = ResourceUtils.getFile("classpath:ca/reader.key");
    // 这里必须要设置alpn,否则会提示ALPN must be enabled and list HTTP/2 as a supported protocol.错误; 这里主要设置了传输协议以及传输过程中的错误解决方式
    ApplicationProtocolConfig alpn = new ApplicationProtocolConfig(ApplicationProtocolConfig.Protocol.ALPN,
                                ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
                                ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
                                ApplicationProtocolNames.HTTP_2);
    SslContext context = SslContextBuilder
            .forClient()
            // 设置alpn
            .applicationProtocolConfig(alpn)
            // 设置使用的那种ssl实现方式
            .sslProvider(SslProvider.OPENSSL)
            // 设置ca证书
            .trustManager(trustManagerFile)
            // 设置客户端证书
            .keyManager(keyCertChainFile, KeyFile)
            .build();
    return context;
}

public Client etcdClient() throws SSLException, FileNotFoundException {
    ClientBuilder builder = Client.builder();
    // 设置服务器地址,这里是列表
    builder.endpoints(etcdProps.getServerAddr().split(StringPool.COMMA));
    // 当服务器端开启ssl认证时则该地方的设置就没有意义了.etcd会使用客户端ca证书中的CN头作为用户名进行权限认证
    if (etcdProps.getAuthority()) {
        ByteSequence user = ByteSequence.from("username");
        ByteSequence pwd = ByteSequence.from("password");

        builder.user(user);
        builder.password(pwd);
    }
    // 这个authority必填.是服务器端CA设置的可授权访问的host域名之一.
    // https访问网站的时候,最重要的一环就是验证服务器方的证书的域名是否与我想要访问的域名一致(可查看ETCD概念入门文章了解CA证书生成)
    builder.sslContext(openSslContext())
            .authority("etcdcluster.com");
    return builder.build();
}

POM依赖

<dependency>
    <groupId>io.etcd</groupId>
    <artifactId>jetcd-core</artifactId>
    <version>0.5.0</version>
    <exclusions>
        <exclusion>
            <artifactId>netty-handler</artifactId>
            <groupId>io.netty</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-tcnative-boringssl-static</artifactId>
    <version>2.0.26.Final</version> <!-- See table for correct version -->
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-handler</artifactId>
    <version>4.1.42.Final</version>
</dependency>

ETCDTOOL

import com.baomidou.mybatisplus.core.toolkit.StringPool;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import lombok.SneakyThrows;
import org.springframework.util.CollectionUtils;

import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * ETCD操作工具类
 * <p>
 * String key = "/myapp/database/user";
 * String value = "Reuben";
 * if (EtcdTool.put(key, value)) {
 * System.out.println(EtcdTool.getSingle(key));
 * }
 * </p>
 */
public class EtcdTool {

    private static Client client;
    private static long TIME_OUT = 1000L;
    private static TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;

    public static void setClient(Client client) {
        EtcdTool.client = client;
    }

    /**
     * 毫秒
     *
     * @param timeout
     */
    public static void setTimeOut(long timeout) {
        EtcdTool.TIME_OUT = timeout;
    }


    /**
     * ETCD设置值传递给客户端需要ByteSequence类型对象才可以
     *
     * @param val 欲转换的值 : 可以为Key或者Value
     * @return
     */
    public static ByteSequence bytesOf(String val) {
        return ByteSequence.from(val.getBytes(StandardCharsets.UTF_8));
    }

    public static String toString(ByteSequence byteSequence) {
        return byteSequence.toString(StandardCharsets.UTF_8);
    }

    /**
     * 判断当前Key是否存在
     *
     * @param key
     * @return
     */
    @SneakyThrows
    public static Boolean hvKey(String key) {
        if (null == key || "".equals(key)) {
            return false;
        }
        ByteSequence byteKey = bytesOf(key);
        GetResponse response = client.getKVClient().get(byteKey).get(TIME_OUT, TIME_UNIT);
        return response.getCount() > 0;
    }

    /**
     * 设置指定K-V
     *
     * @param key
     * @param value
     * @return
     */
    @SneakyThrows
    public static Boolean put(String key, String value) {
        if (null == key || "".equals(key)) {
            throw new NullPointerException();
        }

        CompletableFuture<PutResponse> future = client.getKVClient().put(bytesOf(key), bytesOf(value));

        PutResponse response = future.get(TIME_OUT, TIME_UNIT);

        return null != response;
    }

    /**
     * 获取指定Key的值
     *
     * @param key
     * @return
     */
    @SneakyThrows
    public static String getSingle(String key) {
        if (null == key || "".equals(key)) {
            throw new NullPointerException();
        }

        ByteSequence byteKey = bytesOf(key);
        GetResponse response = client.getKVClient().get(byteKey).get(TIME_OUT, TIME_UNIT);
        if (null != response && response.getCount() > 0) {
            return response.getKvs().get(0).getValue().toString(StandardCharsets.UTF_8);
        } else {
            return null;
        }
    }

    /**
     * 获取指定Key前缀的KV映射表
     *
     * @param prefix
     * @return
     */
    @SneakyThrows
    public static Map<String, String> getWithPrefix(String prefix) {
        if (null == prefix || "".equals(prefix)) {
            throw new NullPointerException();
        }

        ByteSequence prefixByte = bytesOf(prefix);
        GetOption getOption = GetOption.newBuilder().withPrefix(prefixByte).build();
        GetResponse response = client.getKVClient().get(prefixByte, getOption).get(TIME_OUT, TIME_UNIT);

        Map<String, String> kvMap = new HashMap<>();
        if (null != response && response.getCount() > 0) {
            response.getKvs().forEach(item -> kvMap.put(toString(item.getKey()), toString(item.getValue())));
        }
        return kvMap;
    }

    /**
     * 删除指定Key
     */
    @SneakyThrows
    public static Boolean delSingle(String key) {
        if (null == key || "".equals(key)) {
            throw new NullPointerException();
        }
        long deleted = client.getKVClient().delete(bytesOf(key)).get(TIME_OUT, TIME_UNIT).getDeleted();
        return deleted > 0;
    }

    /**
     * 删除指定前缀的Key,返回删除的数量
     */
    @SneakyThrows
    public static long delWithPrefix(String prefix) {
        if (null == prefix || "".equals(prefix)) {
            throw new NullPointerException();
        }

        ByteSequence prefixByte = bytesOf(prefix);

        DeleteOption deleteOption = DeleteOption.newBuilder().withPrefix(prefixByte).build();

        long deleted = client.getKVClient().delete(prefixByte, deleteOption).get(TIME_OUT, TIME_UNIT).getDeleted();
        return deleted;
    }

    /**
     * 开启事务进行批量增删改操作(发布/回滚操作一定要开启事务执行批量操作)
     */
    @SneakyThrows
    public static boolean operationWithTxn(List<String> delKeys, Map<String, String> addOrUpdateKV, String keyPrefix) {
        Txn txn = client.getKVClient().txn();
        if (!CollectionUtils.isEmpty(delKeys)) {
            List<Op.DeleteOp> delOps = new ArrayList<>();
            delKeys.forEach(item -> {

                ByteSequence bsKey = bytesOf(keyPrefix.concat(StringPool.SLASH).concat(item));
                Op.DeleteOp delOp = Op.delete(bsKey, DeleteOption.DEFAULT);

                delOps.add(delOp);
            });
            txn.Then(delOps.toArray(new Op.DeleteOp[0]));
        }
        if (!CollectionUtils.isEmpty(addOrUpdateKV)) {
            Set<Map.Entry<String, String>> entries = addOrUpdateKV.entrySet();
            List<Op.PutOp> addOrUpdateOps = new ArrayList<>();
            for (Map.Entry<String, String> item : entries) {
                ByteSequence bsKey = bytesOf(keyPrefix.concat(StringPool.SLASH).concat(item.getKey()));
                ByteSequence bsVal = bytesOf(item.getValue());
                Op.PutOp putOp = Op.put(bsKey, bsVal, PutOption.DEFAULT);

                addOrUpdateOps.add(putOp);
            }

            txn.Then(addOrUpdateOps.toArray(new Op.PutOp[0]));
        }
        TxnResponse txnResponse = txn.commit().get(TIME_OUT, TIME_UNIT);

        return txnResponse.isSucceeded();

    }

}

错误提示

  1. 未设置ApplicationProtocolConfig会提示ALPN must be enabled and list HTTP/2 as a supported protocol错误
  2. 未处理好grpc-netty、netty-handler、netty-tcnative-boringssl-static依赖版本兼容性(使用jetcd-core依赖时),会提示各种错误例如java.lang.NoSuchFieldError: SSL_SESS_CACHE CLIENTjava.lang.ClassNotFoundException: io.netty.internal.tcnative.SSLContext等.可以拉到最下面可查看版本兼容信息
  3. 提示未找到匹配名称: No name matching "etcd" found. Jetcd默认设置的DNS名称时etcd,但是我们需要在服务器端设置该host才可以,否则无法找到对应的IP地址,如果我们可以设置builder.authority("定义的服务器端CA地址DNS/IP")来解决这个问题
  4. 别忘记设置SslContext的keyManager中的客户端证书和私钥.不然请求回来的信息无法转换读取,也会报错javax.net.ssl.SSLHandshakeException: error:10000412:SSL routines:OPENSSL_internal:SSLV3_ALERT_BAD_CERTIFICATEio.grpc.StatusRuntimeException: UNAVAILABLE: io exception Channel Pipeline: [SslHandler0, ProtocolNegotiatorsClientTlsHandler0, WriteBufferingAndExceptionHandler0, DefaultChannelPipeline$TailContext0]

参考文档:

官方DemoSSL和TLS介绍etcd-TLS攻略常见的PKI标准(X.509、PKCS)及证书相关介绍X.509、PKCS文件格式介绍官方SSL示例etcd配置支持SSL+ACL