Netty 在服务端与客户端的网络通信中,使用的是异步双向通信(双工)的方式,即客户端和服务端可以相互主动发请求给对方,发消息后不会同步等响应。这样就会有一下问题:

  1. 如何识别消息是请求还是响应?
  2. 请求如何正确对应到响应?

1. 如何识别消息是请求还是响应

为了识别消息类型是请求或者响应,我们在消息中加入了 messageType 的属性,在上文我们也提到,这个消息类型在自定义协议的头部,他有几种类型:请求、响应、心跳,我们先来说说请求、响应。

public enum MessageType {
   
    /** * 普通请求 */
    REQUEST((byte) 1),

    /** * 普通响应 */
    RESPONSE((byte) 2),

    /** * 心跳 */
    HEARTBEAT((byte) 3),
    ;
    private final byte value;
}

请求(Request)的核心字段如下:

public class RpcRequest {
   
    /** * 接口名 */
    private String interfaceName;
    /** * 方法名 */
    private String methodName;
    /** * 参数列表 */
    private Object[] params;
    /** * 参数类型列表 */
    private Class<?>[] paramTypes;
    /** * 接口版本 */
    private String version;
}

响应(Response)的核心字段如下:

public class RpcResponse<T> {
   
    /** * 请求id */
    private long requestId;
    /** * 响应码 */
    private Integer code;
    /** * 提示消息 */
    private String message;
    /** * 响应数据 */
    private T data;
}

发送消息的时候,按照消息类型和结构体,将数据组装好,写到 channel 即可。接收消息则要先解码,从消息头拿到消息类型,根据消息类型来反序列化到对应的结构体。

2. 请求如何正确对应到响应

流程图如下:

有几个关键点:

  1. 客户端请求之后拿到 Future
  2. 有一个 Map 存放未响应的请求,Key: RequestId,Value: Future
  3. 服务端响应的数据中,包含了客户端的 RequestId,这是对应的关键
  4. 响应的结果会被 NettyClientHandler.channelRead0 监听到,根据响应的 RequestId 取出对应的 Future
  5. 将结果写到对应的 Future 中
  6. 客户端通过 future.get() 获取到数据

1) 客户端发请求

代码如下:

public class NettyInvoker extends AbstractInvoker {
   

    private final NettyClient nettyClient = NettyClient.getInstance();

    @Override
    protected RpcResult doInvoke(RpcRequest request, URL selected) throws RpcException {
   
        // 获取 Channel
        Channel channel = nettyClient.getChannel(socketAddress);
        // 构造一个空 Future
        CompletableFuture<RpcResponse<?>> resultFuture = new CompletableFuture<>();
        // 构建 RPC 消息,此处会构建 requestId
        RpcMessage rpcMessage = buildRpcMessage(request);
        // 将 request 和 Future 对应放到 Map 中
        UnprocessedRequests.put(rpcMessage.getRequestId(), resultFuture);
        // 发出请求
        channel.writeAndFlush(rpcMessage);
        // 返回结果
        return new AsyncResult(resultFuture);
    }
    // ...
}

返回的 AsyncResult 只是 future 的包装。

public class AsyncResult implements RpcResult {
   

    private final CompletableFuture<?> future;

    public AsyncResult(CompletableFuture<?> future) {
   
        this.future = future;
    }
}

2) 请求暂存

这个存储未响应的请求在 ccx-rpc 中是 UnprocessedRequests 类在管理:

public class UnprocessedRequests {
   
    private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();

    public static void put(long requestId, CompletableFuture<RpcResponse<?>> future) {
   
        FUTURE_MAP.put(requestId, future);
    }
}

3) 服务端响应数据监听

使用 Netty 的 Handler 监听服务端响应的数据,当有数据响应,则调用 UnprocessedRequests.complete 写入。

public class NettyClientHandler extends SimpleChannelInboundHandler<RpcMessage> {
   
    @Override
    protected void channelRead0(ChannelHandlerContext context, RpcMessage requestMsg) {
   
        RpcResponse<?> response = (RpcResponse<?>) requestMsg.getData();
        UnprocessedRequests.complete(response);
    }
}

UnprocessedRequests.complete 实际上就是找出并删除对应的请求,然后将数据写入:future.complete(rpcResponse)

public class UnprocessedRequests {
   
    private static final Map<Long, CompletableFuture<RpcResponse<?>>> FUTURE_MAP = new ConcurrentHashMap<>();

    /** * 完成响应 * * @param rpcResponse 响应内容 */
    public static void complete(RpcResponse<?> rpcResponse) {
   
        CompletableFuture<RpcResponse<?>> future = FUTURE_MAP.remove(rpcResponse.getRequestId());
        if (future != null) {
   
            future.complete(rpcResponse);
        } else {
   
            throw new IllegalStateException("future is null. rpcResponse=" + JSONUtil.toJsonStr(rpcResponse));
        }
    }
}

最后通过 AsyncResult.getData 可以获取到数据。

public class AsyncResult implements RpcResult {
   

    private final CompletableFuture<?> future;

    public AsyncResult(CompletableFuture<?> future) {
   
        this.future = future;
    }

    @Override
    public Object getData() {
   
        try {
   
            return future.get();
        } catch (InterruptedException | ExecutionException e) {
   
            log.error("getData error.", e);
        }
        return null;
    }
}

总结

Netty 网络通信是异步双工的,我们需要用正确 Request-Response 模型让客户端和服务端正确交互。

  1. 如何区分请求或响应?
    在消息中,可以加入 messageType 字段用来区分是请求或者响应。
  2. 如何把请求和响应对应?
    发出的请求需要用 RequestId 标记并用 Map 存起来。服务端收到请求之后,将 RequestId 原封不动写到响应结果中。客户端收到响应结果后,拿出 RequestId 找到对应的 Future 并写入结果。

ccx-rpc 代码已经开源
Github:https://github.com/chenchuxin/ccx-rpc
Gitee:https://gitee.com/imccx/ccx-rpc