Dubbo 系列之服务订阅(二)

一码到底 2020年08月21日 26次浏览

服务订阅,阅读代码前的一些思考?

思考的过程和设计思想如下:

1、我们想要进行远程服务的调用,那么肯定要建立网络连接,不妨改用TCP长连接,并设计通信协议,并封装为一个类,不妨叫做ExchangeClient。用它来进行网络通信。

2、有了可以进行远程通信的服务对象ExchangeClient后,我们可以把远程服务封装为一个Invoker对象,这个Invoker对象内部采用自已定义的协议与远程服务器通信,不妨叫做DubboInvoker,因为采用了dubbo协议来进行网络通信的。

3、有了这个DubboInvoker 我就可以根据dubbo协议与远程服务通信了,但是我还想在本地增加一些过滤器Filter,或者监听器Listener。没关系,直接通过责任链模式,把这些Filter与这个DubboInvoker进行链接。返回的一个ProtocolFilterWrapper对象。

4、同理,如果需要一些监听器的功能怎么办,同样进行一次封装。把ProtocolFilterWraper封装到Listener类型的Invoker对象,不妨叫做ListenerInvokerWrapper。

5、现在考虑远程服务提供者有很多个,那么我对每个远程服务都需要有一个ListenerInvokerWrapper的对象。如下:
Demoservice::196.254.324.1 ListenerInvokerWrapper1
Demoservice::196.254.324.2 ListenerInvokerWrapper2
Demoservice::196.254.324.3 ListenerInvokerWrapper3
Demoservice::196.254.324.4 ListenerInvokerWrapper4
Demoservice::196.254.324.5 ListenerInvokerWrapper5
.....

6、服务太多了,在本地这样创建太费事了。引入了注册中心,直接把服务注册到服务中心上,然后客户端直接从注册中心拉取。我们把拉取到的服务,统称为服务目录。并且它是从注册中心拉取到的,那么不妨名字就叫做RegistryDirectory。那么这个服务目录里肯定包含了上面的远程服务调用对象ListenerInvokerWrapper。我们把这些对象放到服务目录的成员上,名字就叫做urlInvokerMap。key: Demoservice::xxxx。value:ListenerInvokerWrapper。

7、现在我们可以在本地调用RegistryDirectory对象,与远程服务通信了,想调哪个服务就从
urlInvokerMap取出一个进行调用即可。但是每次指定一个远程服务器,不仅太麻烦了,而且也会造成流量不均匀,负责不平衡。那么我们就通过通过负载均衡策略来选择一个服务调用。就取名LoadBalance吧。他有个方法select。入参就是我们的服务目录RegistryDirectory。那么通过LoadBalance.select(RegistryDirectory) 得到一个我们想要的通信的远程服务即可。目前负载均衡算法有一致性Hash算法,随机算法、权重轮训算法、最短响应时间算法、最少活跃数算法。

8、有了负载均衡算法LoadBalance后,我想要这样的功能,当服务调用失败的时候,我可以重试,或者直接直接失败。那我就把有这种能力服务调用,称为一个集群Cluster。他有一个方法叫做join。入参还是服务目录RegistryDirectory。返回一个具有快速失败、或者重试的服务调用,不妨叫AbstractClusterInvoker。每个不同的策略都去实现它。并且这个对象内部通过LoadBalance来选择一个服务进行调用,失败后的策略(是否重试或失败)由我决定。

9、目前我们已经有了一个XXXclusterInvoker 对象,它具有快速失败或者重试等功能,且具有负载均衡算法的远程服务调用对象。但是有时,这些远程服务提供者这的qps不达标,或者新上线的服务有问题,或者远程服务调用失败后,可以在本地模拟的调用,返回一个mock对象。那么我们重新对XXXclusterInvoker进行封装,就命名为MockClusterInvoker,具有Mock功能,且具有集群能力。它持有我们的服务目录RegistryDirectory和XXXclusterInvoker对象。

10、目前我们已经有了一个MockClusterInvoker对象。但是这个invoker对象和我们像本地一样调用服务还是有点差别,最后我们直接通过Java的动态代理计算Proxy.newInstance()来创建一个具体的服务对象DemoService,并且在InvokeHandler内部调用我们的MockClusterInvoker对象的invoke 方法。

11、比如我们的DubboInvoker是通过Java 异步线程CompletableFuture实现的话,如果需要转为同步,还可以对其封装从异步转为同步的Invoker,不妨命名为AsyncToSyncInvoker。

则最终在服务消费端呈现给我们如下一个远程服务代理对象。MockClusterInvoker

#ReferenceBean#getObject()
在上一章节,已经说明了getObject()对象的调用时机,内部调用的ReferenceConfig#init方法,该init()方法主要做了如下几件事情:
1、配缺省的配置进行填充,比如registry,application等属性。
2、校验配置是否填写正确,比如<dubbo:reference />中的stub 和mock 是否配置,配置了是否正确。
3、通过SPI机制获取Protocol$Adaptive自适应协议,通过Protocol$Adaptive#refer()方法得到一个MockClusterInvoker对象。该方法的调用内容基本和上面的猜想设计一直。
1)和注册中心建立tcp连接。
2)把当前的订阅服务注册到注册中心上的consumer节点上。
3)从注册中心中把订阅的服务列表拉取到本地,即RegistryDirectory。
4)根据上面类似猜想创建MockClusterInvoker返回。
4、通过SPI机制获取ProxyFactory$Adaptive自适应代理工厂,然后通过这个代理工厂创建动态代理对象,并把这个代理对象赋值给ref属性。

##REF_PROTOCOL.refer(interfacere,registryUrl)
服务的订阅核心就是这条语句,这条语句博大精深。仅仅一条语句把所有的订阅工作完成了。
1、首先根据SPI机制获取自适应的协议对象。语句如下:
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
该语句创建了Protocol$Apdative。它有个自适应refer方法如下:

@Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (type == null)
            throw new IllegalArgumentException("url == null");

        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");

        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.refer(type, url);
    }

2、Protocol$Apdative#refer()方法内部又通过参数的url的协议头和SPI机制获取一个具体的协议。显而易见,url.getProtocol()返回的是registry。因为当前是服务订阅。所以是registry打头。那么返回的Protocol具体类型就是RegistryProtocol。但是Protocol扩展点有包裹类型:ProtocolListenerWrapper、ProtocolFilterWrapper。所以最终返回的是ProtocolListenerWrapper类型的协议。查看这个2个包裹类型的refer()方法:

类ProtocolListenerWrapper

  public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (UrlUtils.isRegistry(url)) {
            return protocol.refer(type, url);
        }
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, INVOKER_LISTENER_KEY)));
    }

类ProtocolFilterWrapper

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (UrlUtils.isRegistry(url)) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
    }

3、所以Protocol$Apdative#refer()内部的getExtension返回的是ProtocolListenerWrapper的Protocol。又因为url是注册url,所以满足UrlUtils.isRegistry(url)==true.直接进行一次传递调用。

4、最终调到RegistryProtocol#refer()。代码如下:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        url = getRegistryUrl(url);
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

即得到注册中心Registry,一般是ZookeeperRegistry。获取注册中心的内容在之前的章节已见过,就不在多说了。接着会调用doRefer()方法。
在看doRefer()方法之前,我们来看下其定义:
Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url);
出参:
返回值就是我们需要的Invoker对象。
入参:
cluster:集群对象Cluster$Adaptive,通过Spi获取.内部getExtension获取Cluster,默认为FailoverCluster。
registry:注册中心
type:订阅的接口类型
url:服务注册链接注册中心URL。

5、Cluster的join 接口如下:
Invoker join(Directory directory) throws RpcException。
Cluster$Adaptive#join()内部实际是默认调用的是FailoverCluster#join()。
并且Cluster扩展点也有其Wrapper类,即MockClusterWrapper。所以Cluster$Adaptive#join()的方法调用
Cluster extension = ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(extName);
返回的extension是MockClusterWrapper,MockClusterWrapper#join()代码如下:

        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

所以Cluster$Adaptive#join()返回的Invoker类型是MockClusterInvoker。MockClusterWrapper持有的cluster是FailoverCluster,所以MockClusterInvoker内部持有invoker类型是FailoverClusterInvoker。

6、源码doRefer()

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // new 一个服务目录,订阅服务类型为type 的 RegistryDirectory
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // 设置注册中心
        directory.setRegistry(registry);
        
        //设置协议,即Protocol$Adaptive
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        
        //获取订阅参数
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        
        //构建订阅URL ,以consumer//打头
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        
        //把该url注册到注册中心上
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        //设置路由链
        directory.buildRouterChain(subscribeUrl);
        
        //重点,重中之重。这里订阅服务,并且会拉取远程服务invoker 到directory对象的urlInvokerMap成员中。
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        
        //由上面分析,得到是MockClusterInvoker
        Invoker<T> invoker = cluster.join(directory);
        
        //查找注册协议监听器,没有设置为空
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        // 如果有其监听器进行监听器onRefer()调用,并返回RegistryInvokerWrapper包裹类型。
        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

#本地动态代理对象创建createProxy()

/**
     * 核心,通过配置的元信息,创建一个代理对象
     * @param map
     * @return
     */
    @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
    private T createProxy(Map<String, String> map) {
        // 首先判断本地是否有Service提供者,
        if (shouldJvmRefer(map)) {
            //如果有,导出jvm导出refer
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear();
            //指定服务提供者URL。点对点比如在<dubbo:reference url="dubbo://xxxxx:12222">
            if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        if (UrlUtils.isRegistry(url)) {
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
                // if protocols not injvm checkRegistry

                //如果不是jvm 协议,一般是dubbo
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    //检测注册中心
                    checkRegistry();
                    //根据注册中心地址,得到注册服务
                    //registry://106.52.187.48:2181/org.apache.dubbo.registry.RegistryService
                    // ?application=dubbo-demo-annotation-consumer&dubbo=2.0.2&pid=9757&registry=zookeeper&timestamp=1597380362736
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            //对每个注册中心URL,得到监控URL
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {
                        throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
                    }
                }
            }
            //如果注册中心之一一个的话,一般就一个注册中心
            if (urls.size() == 1) {
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                //多个注册中心时,Protocol$Adaptive
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    //把其得到的Invoker 填入invokers
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }

                //多注册中心,多订阅场景
                if (registryURL != null) { // registry url is available
                    // for multi-subscription scenario, use 'zone-aware' policy by default

                    URL u = registryURL.addParameterIfAbsent(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // The invoker wrap relation would be like: ZoneAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, routing happens here) -> Invoker
                    //通过集群,返回一个invoker
                    invoker = CLUSTER.join(new StaticDirectory(u, invokers));
                } else { // not a registry url, must be direct invoke.
                    invoker = CLUSTER.join(new StaticDirectory(invokers));
                }
            }
        }

        if (shouldCheck() && !invoker.isAvailable()) {
            invoker.destroy();
            throw new IllegalStateException("Failed to check the status of the service "
                    + interfaceName
                    + ". No provider available for the service "
                    + (group == null ? "" : group + "/")
                    + interfaceName +
                    (version == null ? "" : ":" + version)
                    + " from the url "
                    + invoker.getUrl()
                    + " to the consumer "
                    + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        /**
         *
         * 这里是发布元数据信息
         */
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // create service proxy
        //通过动态代理把invoker 转化为具体的服务类型
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

上面核心的代码invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0))已分析,接下下来就是通过PROXY_FACTORY.getProxy()创建活动,之后服务调用上进行分析。其他雷士元数据的注册,等之后讲解配置中心时进行讲解。
接下来,以一个图解来描述服务订阅的过程。在下一章节来描述如何具体的拉取远程服务invoker到服务目录RegistryDirectory上的urlInvokerMap。
INVOKER.png