总结

客户端消费者:通过配置的 URL 信息判断走哪种引入

  1. 本地引入,会通过 InJvmProtocol.refer() 生成 Invoker

  2. 远程引入 {

    • 会通过 Cluster 先从 Directory 获取所有可调用的远程服务的 Invoker 列表,然后进行 router 路由过滤
    • 之后通过 SPI 机制获取 LoadBalance 进行负载均衡获取一个 Invoker
    • 然后再经过Filter做一些统计,通过Client做数据传输,比如netty传输。传输需要经过Codec接口做协议构造,再序列化发往对应的服务提供者。

    }

  3. 最后通过ProxyFactory.getProxy(invoker) 生成代理对象并返回。


服务端提供者:

  • 接收到Codec协议处理,反序列化后将请求扔到线程池处理。某个线层会根据请求找到对应的Exporter,之后通过Filter一层层过滤之后找到对应的Invoker,调用实现类然后原路返回结果。

服务暴露

补充细节:

注册中心进行服务引入
  1. 调用 RegistryProtocol.refer(),获取注册中心实例
  2. 调用 doRefer(), 生成 RegistryDirectory 用于存放注册中心实例等信息,向注册中心注册自身信息,并向订阅 Providers节点、configurators节点、routers节点
  3. 会根据不同的 URL 的参数触发不同的实现类,例如: DubboProtocol.refer() 里面会调用 getClients() 获取client,可以使用 netty 等进行网络通信,最后生成 Invoker

源码总结

通过配置组成 URL ,然后通过自适应得到对应的实现类进行服务引入。

  1. 通过 ReferenceBean.getObject() 调用 ReferenceConfig.init() 加载检查各种配置,并构建在 map 中

  2. 调用 createProxy()

    1. 配置 URL
      • 如果走本地引入,会直接构建本地协议的 URL 进行服务引入
      • 如果配置了 URL 协议,也就是通过 url.getProtocol() 来组装生成 URL
      • 如果没有配置 URL ,会获取注册中心的 URL ,如果有监控中心存放到 map 中,将 map 中,将 map 转化为可查询的字符串作为 URL 中 refer 的属性
    2. 根据 URL 生成 Invoker
      • 如果只有一个 URL ,直接根据 protocol.refer(interfaceClass, url) 生成 Invoker
      • 如果多个 URL ,通过遍历调用 protocol.refer() 生成 Invoker,并且构建 staticDirectory 包装 Invoker , 再通过 cluster.join() 封装成一个 Invoker
    3. 最终根据 ProxyFactory.getProxy(invoker) 生成代理对象并返回。

如果是注册中心,需要向注册中心注册自己的信息,然后订阅注册中心的相关信息,得到远程 Provider 的 ip 等信息,在通过 netty 客户端进行连接。



服务引用

服务引用的大致流程

服务引用的大致流程


服务引入的时机

  1. 饿汉式,通过实现Spring的InitializingBean接口的afterPropertiesSet(),容器通过调用ReferenceBean的 afterPropertiesSet() 时引入服务
  2. 懒汉式,当这个服务被注入到其他类中时启动引入流程,也就是说用到了才会开始服务引入
  3. 默认情况下,Dubbo使用懒汉式引入服务,如果使用饿汉式,可通过配置 dubbo:reference 的init 属性开启

服务引入的三种方式

  1. 本地引入:服务暴露的流程每个服务都会通过本地暴露,走injvm协议。
  2. 直连远程引入:不需要启动配置中心,由Consumer直接配置写死Provider的地址,然后直连即可。
  3. 通过注册中心引入:Consumer通过注册中心得知Provider的相关信息,然后进行服务引入,会涉及到多个注册中心,同一个服务多个提供者的情况,会进行封装、负载均衡、容错等机制。


源码解析过程

ReferenceBean.getObject()
  • ReferenceBean继承ReferenceConfig
  • ReferenceBean.getObject()直接返回ReferenceConfig.get()
  • 调用ReferenceConfig.init(),作用是加载检查各种配置,并构建在map中
  • 调用ReferenceConfig.createProxy()
    
    //  ReferenceBean
    @Override
    public Object getObject() throws Exception {
        return get();
    }
  


    //  ReferenceConfig
    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("Already destroyed!");
        }
        if (ref == null) {
            init();
        }
        return ref;
    }



	//  ReferenceConfig
	private void init() {
        
      
      	······················
          
          
        //  加载各种配置,存放到map中
        if (application != null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        checkApplication();
        checkStubAndMock(interfaceClass);
        Map<String, String> map = new HashMap<String, String>();
        Map<Object, Object> attributes = new HashMap<Object, Object>();
        map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        
      
        ························
          

        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
      
        // 创建代理
        ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }
if (ref == null) 存在的问题?
  • 调试的时候发现这个ref竟然不等于null,因此无法进入init() 里面调试。
  • 然后发现是IDEA 为了显示对象信息,会通过toString方法获取对象对应的信息,这个方法会通过反射调用ReferenceBean 的getObject() 因为以get开发的方法,进而触发了引入服务动作。
  • 2.65以后的版本修改了这一问题。 alt

createProxy

配置 URL:

  • 如果走本地的话,直接构建个走本地协议的 URL,然后进行服务引入
  • 如果配置了 URL,会根据配置URL 协议,也就是url.getProtocol()来组装生成 URL;如果没有配置 URL,会获取注册中心的 URL,如果有监控中心存放到map中,将map转化为可查询的字符串作为URL 中refer 的属性

根据 URL 生成 Invoker:

  • 如果只有一个 URL ,直接根据 refprotocol.refer(interfaceClass, url) 生成 Invoker
  • 如果多个 URL ,通过遍历调用refprotocol.refer 生成 Invoker,并且构建 staticDirectory 包装 Invoker,再通过 cluster.join() 封装成一个 Invoker
  • 最后,proxyFactory.getProxy(invoker) 生成代理对象,封装 Invoker 返回服务引用,之后Consumer 调用的就是这个代理类
private T createProxy(Map<String, String> map) {
  		
  		//  先创建一个临时URL
        URL tmpUrl = new URL("temp", "localhost", 0, map);
        final boolean isJvmRefer;
  
  
  		//  看是不是走本地的 injvm,如果配置里面写了 URL 肯定不是本地
        if (isInjvm() == null) {
            if (url != null && url.length() > 0) { 
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                isJvmRefer = true;
            } else {
                isJvmRefer = false;
            }
        } else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            
            
          
            //  本地引用
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            
          
          
            //  如果配置里面设置了 URL,那么要么是点对点直连、要么是配置中心地址
            if (url != null && url.length() > 0) { 
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                          
                          
                        //  如果是注册中心地址,将map转化为字符串,并作为 refer 参数添加到 url 中
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            
                          
                            //  如果是点对点会合并 URL,移除服务提供者的一些配置
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { 
              
              
                // 获取注册中心的地址,与服务暴露调用方法一致,区别在于参数是false 表明不是provider
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                      
                      
                        // 获取监控中心
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                      
                      
                        // 向 URL 中添加 refer 参数,将map转化为可查询字符串
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
                if (urls == null || urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                }
            }

          
          
          
            // 根据各种参数来组装 URL
            if (urls.size() == 1) {
              
                //  如果只有一个 URL 直接转换成 Invoker
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    
                    //  多个 URL 依次转化成 Invoker
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        
                        // use last registry url 使用最后一个注册中心的地址
                        registryURL = url; 
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    // 指定用 AvailableCluster
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                  
                    // 创建StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并,只暴露一个Invoker 便于调用
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

  
        ················
          
          
        // create service proxy  通过代理封装 Invoker 返回代理
        return (T) proxyFactory.getProxy(invoker);
    }


如何通过注册中心的进行服务引入?

  • 针对通过 RegistryProtocol.refer() 获取到 Provider地址的详解
  • 触发不同的具体实现类,例如:DubboProtocol.refer() 调用 netty 等进行网络通信获取到 client,并封住为 Invoker

RegistryProtocol.refer()
  • 通过SPI机制,调用对应的protocol.refer 来得到相应的 Invoker。例如:本地引入 InjvmProtocol、远程引入 DubboProtocol。
  • 根据URL 的参数,会先调用RegistryProtocol.refer(),获取注册中心实例
  • 调用doRefer()

    //  RegistryProtocol.refer() 
	@SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
      
        //  取 registry 参数值,并将其设置为协议头
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
      
        //  获取注册中心实例
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

doRefer
  • 生成RegistryDirectory,存放注册中心实例、protocol
  • 向注册中心注册自身的信息,并且向注册中心订阅 providers节点、configurators节点、routers节点,订阅之后RegistryDirectory会收到这几个节点下的信息,就会根据URL 中的协议触发不同的实现类,例如:调用DubboProtocol.refer() 生成 DubboInvoker
  • 通过cluster 再包装一下 Invoker,因为一个服务可能有多个提供者,最终在ProviderConsumerRegTable 中记录这些信息,然后返回 Invoker
  • 至此从注册中心获取到 provider的信息,继续执行服务引入
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
  
        //  创建一个directory实例,存放注册中心实例、动态生成protocol
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
  
  
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
           
          
          //  向注册中心注册服务消费者,在 consumers 目录下创建新节点
          registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
  
  
        //  订阅 注册中心的 providers 目录、configurators 目录、routers 目录,订阅好了之后就会触发 DubboProtocol 的refer()
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));
  
  
     	//  利用cluster 封装 directory其实就是封装多个 Invoker
        Invoker invoker = cluster.join(directory);
  
  		//  向提供者消费者表记录信息
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

DubboProtocol.refer
  • 重点在于调用getClients()、生成一个客户端,之后补充接口、URL等信息生成 Invoker
	@Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        
        // 接口、远程服务的URL、创建一个client
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

getClients
  • 用于获取客户端实例,实例类型为 ExchangeClient,底层依赖 Netty 来进行网络通信,并且可以看到默认是共享连接
  • 如果没有client,会执行 initClient()
	private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
      
      
      	//  获取连接数,默认为0,表示未配置
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        if (connections == 0) {
            
            //  默认共享连接
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                
                //  返回共享的客户端
                clients[i] = getSharedClient(url);
            } else {
              
              	//  得到初始化的新客户端
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
initClient
    private ExchangeClient initClient(URL url) {

        // 获取客户端类型,默认为 netty
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

      	// 添加编译码 和 心跳包参数到 URL 中
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // 检查是否有这个类型的客户端
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
          
            // 懒连接的客户端
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
               
              	// 连接远程
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }