总结

客户端发送请求:

  • 调用某个接口的方法会调用之前生成的代理类,代理类会通过 cluster(默认是 FailoverCluster) 从 Directory 获取一堆 invokers
  • 然后会经过 router 路由的过滤(看配置也会添加 mockInvoker 用于服务降级)、然后再通过 SPI 得到 LoadBalance 进行负载均衡选择一个 Invoker 发起远程调用
  • 封装 request 请求并记录此请求和请求的 ID 等待服务端的响应,再通过 NettyClient 发起远程调用。

服务端接收请求:

  • 服务端接受请求之后,根据协议得到信息并反序列化成对象,再按照派发策略派发消息,默认是 ALL,全部请求扔给业务线程池
  • 业务线程池会根据消息类型判断得到 serviceKey,从之前服务暴露的 exporterMap 中获取相应的 exporter,得到Invoker,然后最终调用真正的实现类
  • 再组装好结果返回,这个响应会带上之前请求的 ID。客户端收到这个响应之后会通过 ID 找到存储的 Future,然后塞入响应再唤醒等待 future 的线程,最后客户端得到响应,完成远程调用的过程。 alt


客户端源码解析

客户端告知服务端的具体信息应该包含哪些?

  • 首先客户端肯定要告知要调用是服务器的哪个接口,当然还需要方法名、方法的参数类型、方法的参数值,还有可能存在多个版本的情况,所以还得带上版本号
  • 服务端可以清晰的得知客户端要调用的是哪个方法、可以进行精确调用

MockClusterInvoker.invoke()
  • 客户端调用具体的接口会调用生成的代理类,而代理类会生成一个 RPCInvocation 对象调用 MockClusterInvoker.invoke()
  • 判断配置里面有没有配置 mock
  • this.invoker.invoke(invocation) 调用 AbstractClusterInvoker.invoker()
	public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        // 获取 mock 配置
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
          
            // 不走 mock 逻辑
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
          
            // 强制走 mock 逻辑
            result = doMockInvoke(invocation, null);
        } else {
          
            // 调用失败了失败的 mock逻辑
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                ·················
            }
        }
        return result;
    }
AbstractClusterInvoker.invoke()
  • 调用 directory.list() 进行路由过滤
  • 调用 ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension() SPI 机制获取 LoadBalance 实现类
  • 最后调用子类 doInvoke() 默认调用 FailoverClusterInvoker.doInvoker()
  • 这个结构体现 模板方法,因为路由过滤、获取LoadBalance实现类都是每个子类都要执行的,所以抽象到抽象类中
	public Result invoke(final Invocation invocation) throws RpcException {
      
        // 检查是否被销毁
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;
      
        // 路由过滤
        List<Invoker<T>> invokers = list(invocation);
      
        // 通过 SPI 机制获取 LoadBalance 实现类
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
      
        // 调用子类方法,使用模板方法
        return doInvoke(invocation, invokers, loadbalance);
    }
AbstractDirectory.list()
  • 通过方法名找到 Invoker,然后经过服务的路由过滤
  • 在进行 LoadBalance 的选择,得到一个 Invoker,默认使用 FailoverClusterInvoker 的容错方式
  • 容错机制、负载均衡
	public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
      
        //  通过方法名找到对应的 Invoker
        List<Invoker<T>> invokers = doList(invocation);
      
        //  路由过滤
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                  
                    //  根据参数判断是否需要进行路由
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }
FailoverClusterInvoker.doInvoke()
  • FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表,并且经过路由之后,会让 LoadBalance 从 Invoker 列表中选择一个 Invoker
  • select() 通过负载均衡选择一个 Invoker
  • RpcContext.getContext().setInvokers() 上下文保存调用过的 Invoker
  • invoker.invoke() 发起远程调用、会调用 AbstractInvoker.invoke() 会返回具体实现的子类 DubboInvoker.doInvoke()
	public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
      
        ······················
        
        //  重试次数
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            ························
            //  负载均衡选择一个 Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
          
            //  上下文保存调用过的 Invoker
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 发起调用
                Result result = invoker.invoke(invocation);
                ·············
                return result;
            } catch (RpcException e) {
                ·················
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(·················);
    }
DubboInvoker.doInvoke()
  • 远程调用三种方式:oneway、异步、同步
  • oneway:当你不关心你的请求是否发送成功的情况下,就用 oneway 的方式发送,这种方式消耗最小,啥都不用记、啥都不用管;
  • 异步调用:Dubbo 天然就是异步的,可以看到 client 发送请求之后会得到一个 ResponseFuture,然后把 future 包装一下塞到上下文中,这样用户就可以从上下文拿到这个 future,然后用户可以做了一波操作之后再调用 future.get() 等待结果;
  • 同步调用:Dubbo 框架帮助我们异步转同步了,从代码可以看到在 Dubbo 源码中就调用了 future.get(), 之后就阻塞住了,必须等待结果到了之后才能返回,所以是同步的

Dubbo 本质是异步的,dubbo 框架帮我们转化为同步,区别其实就是 future.get() 在用户代码被调用还是框架代码被调用。

其实我个人总结就是因为会调用 RpcContext.getContext().setFuture() 存储 future,实现了异步调用。而同步就不使用这个上下文,直接等待 currentClient.request() 结果的返回

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
  
        //  设置 path 到 attachment
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        //  选择 client
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            // 是否异步调用
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            // 是否 oneway 方式发送
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            // 获取超时时间
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            
            // 不需要返回值
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent); // 发送
                RpcContext.getContext().setFuture(null); 
                return new RpcResult();
            } else if (isAsync) {
              
                // 异步发送
                ResponseFuture future = currentClient.request(inv, timeout);
                // 设置 future
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); 
                return new RpcResult();
            } else {
                
                // 同步发送
                RpcContext.getContext().setFuture(null);
                // 调用 future.get() 等待
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            ··················
        }
    }
currentClient.request()
  • 会调用 HeaderExchangeChannel.request() 组装 request,
  • 通过 DefaultFuture() 构造 future
  • 再调用 channel.send() 也就是调用子类 NettyClient 发起请求 总结
	public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            ···············
        }
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
          
            // 调用 nettyClient 的 sent 发送请求
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
Request、DefaultFuture

由于是异步,那么 future 通过上下文保存之后,等待响应回来了如何找到对应的 future 呢?

  • 通过唯一 ID,Request 会生成一个全局唯一 ID,然后 future 内部会将自己和 ID 存储到一个 ConcurrentHashMap。这个 ID 发送到服务端之后,服务端也会把这个 ID 返回来,这样通过 ID 再去 ConcurrentHashMap 里面就可以找到对应的 future。

介绍 Request、DefaultFuture:

  • Request类:通过构造函数调用静态方法 newId() 创造唯一ID, 在 newId() 静态方法通过原子类的方式自增
  • DefaultFuture类:通过 ConcurrentHashMap 存储唯一 ID 与 Future 的映射关系,调用 received() 方法,通过 Response 中的 ID 从 map 中获取 Future

 // Request
 public class Request {
   
    // 原子类
    private static final AtomicLong INVOKE_ID = new AtomicLong(0);
    // 构造函数调用 newId() 创建唯一 ID
    public Request() {
        mId = newId();
    }
    // 通过原子类实现自增 ++ 
    private static long newId() {
        return INVOKE_ID.getAndIncrement();
    }
 }



 // DefaultFuture
 public class DefaultFuture implements ResponseFuture {
    
    // 存储对应的Future
    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
      
        // 将唯一 ID 与 future 的关系存放到 ConcurrentHashMap中
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
   
   
   
   // 接受响应、调用received()
   public static void received(Channel channel, Response response) {
        try {
             
            // 通过 ID 从 map 中获取 future
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn(···················);
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
}


服务端源码解析

服务端接收到请求之后就会解析请求得到消息,这消息有五种派发策略:

默认走 all 策略,也就是所有消息都派发到业务线程池中。

策略 用途
all 所有消息都派发到线程池,包括请求、响应、连接事件、断开事件等
direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行
message 只有请求和响应消息派发到线程池,其他消息均在 IO 线程上执行
execution 只有请求消息派发到线程池,不含响应。其他消息均在 IO 线程上执行
connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其他消息派发到线程池

AllChannelHandler.received()
  • getExecutorService():获取线程池,通过调用 WrappedChannelHandler 类构造函数的 SPI 机制获取自定义的线程池
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
  • 将消息封装成 ChannelEventRunnable() 放入业务线程池中执行
  • 根据 ChannelState 调用对应的处理方法,这里是 ChannelState.RECEIVED,所以调用 handler.received,最终调用 HandlerExchangeHandler.handleRequest()
public void received(Channel channel, Object message) throws RemotingException {
  
        // 获取线程池
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
        		Request request = (Request)message;
                
                //  如果需要返回响应,将错误封装到响应中
        		if(request.isTwoWay()){
        			String msg = ;
        			return;
        		}
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
HandlerExchangeHandler.handleRequest()
  • 通过请求 ID 构造响应 response,并获取请求的信息
  • 通过调用 handler.reply() 调用 DubboProtocol.reply()
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
     
        // 通过请求 ID 构造 Response
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            ······················
        }
        // 获取请求的信息,方法名等
        Object msg = req.getData();
        try {
          
            // 最终调用 DubboProtocol.reply()
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }
DubboProtocol.reply()
  • 根据 message 封装成 Invocation
  • 通过 DubboProtocol.this.getInvoker() 根据请求信息获取对应的 Invoker
  • 调用 invoker.invoke(),会通过 ProxyFactory 使用 javassist 动态代理生成 Invoker,通过protocol.export() 生成 exporter。参考:服务暴露
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (!(message instanceof Invocation)) {
                ························
            } else {
                Invocation inv = (Invocation)message;
                // 通过 message 获取对应的 Invoker
                Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get("_isCallBackServiceInvoke"))) {
                  // 回调逻辑
                  ·························
                }

                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            
                return invoker.invoke(inv);
            }
        }
DubboProtocol.this.getInvoker()

前提了解:

服务暴露通过ProxyFactory.getInvoker() 生成 Invoker, 包装后调用 protocol.exporter() 生成 exporter,并将 serviceKey、exporter 存放在 exporterMap 中。详细过程参考服务暴露

  • 通过请求信息构造出 serviceKey,从服务暴露的时候存储的 exporterMap 中获取对应的 exporter 服务
  • 通过 exporter.getInvoker() 返回 Invoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ····························
        String serviceKey = serviceKey(port, path, (String)inv.getAttachments().get("version"), (String)inv.getAttachments().get("group"));
  
        // 通过 serviceKey 从 map 中获取之前暴露的服务
        DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
        if (exporter == null) {
            throw new RemotingException(·················);
        } else {
          
            // 返回 Invoker
            return exporter.getInvoker();
        }
    }


参考