Dubbo 服务暴露之服务远程暴露——订阅与通知机制

Subscribe/Notify

Posted by Jay on March 3, 2019

Dubbo 服务暴露之服务远程暴露——订阅与通知机制

服务远程暴露的总体步骤为

  • ref封装为Invoker
  • Invoker转换为Exporter
  • 启动Netty服务端
  • 注册服务到Zookeeper
  • 订阅与通知机制
  • 返回新的Exporter实例

在前面几篇文章中已经分析了前四步的过程,下面分析第五步——订阅与通知的过程。总体代码如下(RegistryProtocol.export(final Invoker<T> originInvoker)):

// 订阅服务提供者的override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。 ??
// 获取override数据订阅的url
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// 创建提供者动态覆盖配置数据变更的监听器
final OverrideListener overrideSubscribeListener = new  OverrideListener(overrideSubscribeUrl, originInvoker); 
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 缓存
// 进行订阅
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

说明:

  • 第一句代码根据registedProviderUrl来获取overrideSubscribeUrl
  • 第二句代码创建overrideSubscribeListener
  • 第三句代码将{ overrideSubscribeUrl : overrideSubscribeListener}放入缓存 。
  • 第四句代码实现真正的订阅与通知。

一、获取overrideSubscribeUrl

// 获取override数据订阅的url
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);

// 获取 订阅服务提供者动态配置覆盖数据 的url。
 	1 将协议改为provider
 	2 添加参数category=configurators和check=false
// @param registeredProviderUrl 提供者url
private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
    return registeredProviderUrl.setProtocol(Constants.PROVIDER_PROTOCOL)
        .addParameters(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY,
                       Constants.CHECK_KEY, String.valueOf(false));
}

开始时的registedProviderUrl如下:

  • dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441

最终的overrideSubscribeUrl如下:

  • provider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441

二、创建overrideSubscribeListener

// 创建提供者动态覆盖配置数据变更的监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);

OverrideListenerRegistryProtocol的内部类,来看一下声明和属性:

private class OverrideListener implements NotifyListener {

    // 订阅服务提供者的动态配置数据(订阅条件)
    // 如provider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
    private final URL subscribeUrl;
    // DelegateProviderMetaDataInvoker实例(包装AbstractProxyInvoker实例)
    private final Invoker originInvoker;

    public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
        this.subscribeUrl = subscribeUrl;
        this.originInvoker = originalInvoker;
    }
}

这里创建出来的OverrideListener实例属性如下:

  • subscribeUrlprovider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
  • originInvoker:该实例是在ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)创建出来的DelegateProviderMetaDataInvoker实例,包装了AbstractProxyInvoker实例。
    • AbstractProxyInvoker实例
      • proxyDemoServiceImpl2实例
      • typeClass<com.alibaba.dubbo.demo.DemoService>
      • urlregistry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&export=dubbo%3A%2F%2F172.16.132.166%3A20881%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D172.16.132.166%26bind.port%3D20881%26cellinvokemode%3Dsharing%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D73422%26side%3Dprovider%26timestamp%3D1550973009441&group=dubbo_test&pid=73422&registry=zookeeper&timestamp=1550973009427
    • metadata: ServiceBean实例

最后,将创建出来的OverrideListener实例存储在RegistryProtocol的属性Map<URL, NotifyListener> overrideListeners中:

  • key: (overrideSubscribeUrl,也就是subscribeUrlprovider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
  • value: 上述的OverrideListener实例

三、订阅与通知

// 进行订阅
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

这里的registryZookeeperRegistry实例,subscribe(URL url, NotifyListener listener)方法在其父类FailbackRegistry中,如下:

// 订阅数据变更
// @param url      订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
// @param listener 变更事件监听器,不允许为空
@Override
public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 向服务器端发送订阅请求
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;
        // 有异常,则根据订阅的url获取缓存的数据
        List<URL> urls = getCacheUrls(url);
        if (urls != null && urls.size() > 0) {
            // 用缓存的数据发起通知
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // 如果开启了启动时检测check=true,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // 将失败的订阅请求记录到失败列表,定时重试
        addFailedSubscribed(url, listener);
    }
}

步骤:

  • 首先调用其父类AbstractRegistrysubscribe(URL url, NotifyListener listener)方法,将之前创建出来的overrideSubscribeListener实例加入到overrideSubscribeUrl所对应的监听器集合中。
  • 然后从failedSubscribed/failedUnsubscribedoverrideSubscribeUrl所对应的监听器集合中删除overrideSubscribeListener实例;从failedNotified获取当前url的通知失败map Map<NotifyListener, List<URL>>,之后从中删除掉该NotifyListener实例以及其需要通知的所有的url数据。
  • 之后使用具体的子类(这里是ZookeeperRegistry)向服务器端发送订阅请求。
  • 如果在订阅的过程中抛出了异常,那么尝试获取缓存的url数据,如果有缓存的url数据,则进行失败通知,之后“将失败的订阅请求记录到失败列表,定时重试”,如果没有缓存的url数据,如果开启了启动时检测或者直接抛出的异常是SkipFailbackWrapperException,则直接抛出异常,不会“将失败的订阅请求记录到失败列表,定时重试”。

1.将之前创建出来的overrideSubscribeListener实例加入到overrideSubscribeUrl所对应的监听器集合中(AbstractRegistry类)

// 订阅的url,对应监听器集合
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

// 首先从ConcurrentMap<URL, Set<NotifyListener>> subscribed中获取key为url的集合Set<NotifyListener>,如果该集合存在,直接将当前的NotifyListener实例存入该集合,如果集合不存在,先创建,之后放入subscribed中,并将当前的NotifyListener实例存入刚刚创建的集合
// @param url 订阅条件,不允许为空,如:consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
// @param listener 变更事件监听器,不允许为空
@Override
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners == null) {
        subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
        listeners = subscribed.get(url);
    }
    listeners.add(listener);
}

2.从失败集合中移除overrideSubscribeListener实例

// 1.从ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed 中获取当前url的订阅失败列表Set<NotifyListener>,之后从中删除掉该NotifyListener实例;
// 2.从ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed 中获取当前url的反订阅失败列表Set<NotifyListener>,之后从中删除掉该NotifyListener实例;
// 3.从ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified 中获取当前url的通知失败map Map<NotifyListener, List<URL>>,之后从中删除掉该NotifyListener实例以及其需要通知的所有url数据。
private void removeFailedSubscribed(URL url, NotifyListener listener) {
    Set<NotifyListener> listeners = failedSubscribed.get(url);
    if (listeners != null) {
        listeners.remove(listener);
    }
    listeners = failedUnsubscribed.get(url);
    if (listeners != null) {
        listeners.remove(listener);
    }
    Map<NotifyListener, List<URL>> notified = failedNotified.get(url);
    if (notified != null) {
        notified.remove(listener);
    }
}

3.实际的订阅逻辑ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)

protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 此分支表示订阅所有服务
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath(); // /dubbo_test
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        // 当前的所有子节点服务
                        for (String child : currentChilds) {
                            child = URL.decode(child); // 得到子节点(服务)
                            if (!anyServices.contains(child)) { // 判断是否已经订阅过
                                anyServices.add(child); 
								// 未订阅,则订阅指定服务下的数据变更                                
 subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            // 持久节点
            zkClient.create(root, false);
            // 添加子节点变更监听器,返回的是最初的子节点列表,即目前存在的服务
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && services.size() > 0) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 分别订阅指定服务下的数据变更
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            // 此分支表示订阅指定服务
            // ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
            // 1.根据url获取ConcurrentMap<NotifyListener, ChildListener>,没有就创建
            // 2.根据listener从ConcurrentMap<NotifyListener, ChildListener>获取ChildListener,没有就创建(创建的ChildListener用来监听子节点的变化)
            // 3.创建path持久化节点
            // 4.添加path子节点监听器
            List<URL> urls = new ArrayList<URL>();
            // toCategoriesPath(url)得到服务类别的路径列表,如 /dubbo_test/com.alibaba.dubbo.demo.DemoService/providers,consumers,routers,configurators
            for (String path : toCategoriesPath(url)) {
                // ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners <订阅条件,<通知监听器,子节点变更监听>> 
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); // 根据url获取listeners
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        // 监听子节点列表的变化
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            // 子节点变更通知
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                // 持久节点 /dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators
                zkClient.create(path, false);
                // 添加path下子节点变更的监听器
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    // 先数据匹配
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 再通知
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

说明:

  • url(overrideSubscribeUrl)provider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
  • listener:之前创建出来的overrideSubscribeListener实例

订阅指定服务时的步骤:

  • 首先获取category path:实际上就是获取/dubbo_test/{servicename}/{url中的category参数,默认是providers,这里是final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);这句代码中添加到overrideSubscribeUrl上的category=configurators}
private String[] toCategoriesPath(URL url) {
    String[] categroies;
    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
        categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
                                  Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
    } else {
        categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
    }
    String[] paths = new String[categroies.length];
    for (int i = 0; i < categroies.length; i++) {
        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
    }
    return paths; // ["/dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators"]
}
  • 然后获取并创建:ConcurrentMap<overrideSubscribeUrl, ConcurrentMap<overrideSubscribeListener实例, ChildListener>> zkListeners,这里创建出来的ChildListener实例中的childChanged(String parentPath, List<String> currentChilds)方法实际上就是最终当parentPath(实际上就是上边的category path)下的currentChilds发生变化时,执行的逻辑。
  • 之后创建持久化节点:/dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators
  • 然后使用AbstractZookeeperClient<TargetChildListener>addChildListener(String path, final ChildListener listener)方法为path下的子节点添加上边创建出来的ChildListener实例
  • 最后进行通知

4.AbstractZookeeperClient<TargetChildListener>.addChildListener(String path, final ChildListener listener)

// 1.根据path从ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners获取ConcurrentMap<ChildListener, TargetChildListener>,没有就创建。
// 2.根据ChildListener获取TargetChildListener,没有就创建,TargetChildListener是真正的监听path的子节点变化的监听器。createTargetChildListener(String path, final ChildListener listener)方法创建一个真正的用来执行当path节点的子节点发生变化时的逻辑
// 3.addTargetChildListener(path, targetListener):将刚刚创建出来的子节点监听器订阅path的变化,这样之后,path的子节点发生了变化时,TargetChildListener才会执行相应的逻辑。而实际上TargetChildListener又会调用ChildListener的实现类的childChanged(String parentPath, List<String> currentChilds)方法,而该实现类,正好是ZookeeperRegistry中实现的匿名内部类,在该匿名内部类的childChanged(String parentPath, List<String> currentChilds)方法中,调用了ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))。
public List<String> addChildListener(String path, final ChildListener listener) {
    ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
    if (listeners == null) {
        childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
        listeners = childListeners.get(path);
    }
    TargetChildListener targetListener = listeners.get(listener);
    if (targetListener == null) {
        listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
        targetListener = listeners.get(listener);
    }
    return addTargetChildListener(path, targetListener);
}

步骤:

  • 首先是一顿获取和创建:ConcurrentMap<categorypath, ConcurrentMap<ChildListener实例, TargetChildListener>> childListeners,这里主要是创建TargetChildListener
  • 之后为path节点添加TargetChildListener实例。

5.ZkclientZookeeperClient.createTargetChildListener(path, listener)

public IZkChildListener createTargetChildListener(String path, final ChildListener listener) {
    return new IZkChildListener() {
        @Override
        public void handleChildChange(String parentPath, List<String> currentChilds)
                throws Exception {
            listener.childChanged(parentPath, currentChilds);
        }
    };
}

这里创建一个监听path下子节点变化的IZkChildListener实例,当path下有子节点变化时,调用listener(即传入的ZookeeperRegistry中创建的ChildListener实例)的childChanged(String parentPath, List<String> currentChilds)方法。

6.ZkclientZookeeperClient.addTargetChildListener(String path, final IZkChildListener listener)

@Override
public List<String> addTargetChildListener(String path, final IZkChildListener listener) {
    return client.subscribeChildChanges(path, listener);
}

从上边的分析可以看出,当path节点下的子节点发生变化的时候,会首先调用TargetChildListener(IZkChildListener)handleChildChange(String parentPath, List<String> currentChilds)方法,在该方法中又会调用ChildListener实例的childChanged(String parentPath, List<String> currentChilds)方法,那么来分析一下该方法:

// 监听子节点的变化
public void childChanged(String parentPath, List<String> currentChilds) {
    // 子节点变更通知
    ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}

步骤:

  • 首先获取子节点urls或者是一个empty协议的url

    // 匹配订阅条件,返回数据。无数据,则新建一个empty://协议的url返回
    // 1.首先过滤出providers中与consumer匹配的providerUrl集合;
    // 2.如果providerUrl集合不为空,直接返回这个集合;
    // 3.如果为空,首先从path中获取category,然后将consumer的协议换成empty,添加参数category=configurators。
    // @param consumer provider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441(订阅条件)
    // @param path /dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators(父路径)
    // @param providers 空ArrayList(子节点)
    private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
        List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
        // 目前无数据,则新建一个empty://协议的url
        if (urls.isEmpty()) {
            int i = path.lastIndexOf('/');
            // 类型
            String category = i < 0 ? path : path.substring(i + 1); // configurators
            URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
            urls.add(empty);
        }
        // empty://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=75315&side=provider&timestamp=1550989212512
        return urls;
    }
      
    // 匹配订阅条件,返回匹配的数据,即过滤出providers中与consumer匹配的url集合。
    // @param consumer 订阅条件
    // @param providers 目前有的数据
    // @return 返回跟条件匹配的数据
    private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
        List<URL> urls = new ArrayList<URL>();
        if (providers != null && providers.size() > 0) {
            for (String provider : providers) {
                provider = URL.decode(provider);
                if (provider.contains("://")) {
                    URL url = URL.valueOf(provider);
                    if (UrlUtils.isMatch(consumer, url)) {
                        urls.add(url);
                    }
                }
            }
        }
        return urls;
    }
    
  • 之后调用ZookeeperRegistry的父类FailbackRegistry.notify(URL url, NotifyListener listener, List<URL> urls)

    // 通知
    // @param url 订阅的url
    // @param listener 订阅的url对应的通知监听器
    // @param urls 变更后,目前的数据
    @Override
    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            // 通知的实际动作
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // 将失败的通知请求记录到失败列表,定时重试
            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
            if (listeners == null) {
                failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                listeners = failedNotified.get(url);
            }
            listeners.put(listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }
      
    // 通知的实际动作
    protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
        super.notify(url, listener, urls);
    }
    
    • 说明:这里传入的
      • url(overrideSubscribeUrl)provider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
      • listener:之前创建出来的overrideSubscribeListener实例
      • urls[empty://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=75315&side=provider&timestamp=1550989212512]
      • 这里首先执行父类的AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls),如果失败,则获取或创建ConcurrentMap<overrideSubscribeUrl, Map<overrideSubscribeListener实例, urls>> failedNotified,后续做重试。

来看一下通知的最核心部分:

7.AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)

//  根据订阅url,找出匹配的变更数据urls,触发对应的监听器
// @param url 订阅的url
// @param listener 订阅的url对应的通知监听器
// @param urls 通知数据

// 1.首先遍历List<URL> urls,将urls按照category进行分类,存储在Map<"categoryName", List<URL>> result中;
// 2.之后遍历result:(每遍历一个,都是一个新的category)
//  (1)将Map<"categoryName", List<URL>>存储在ConcurrentMap<URL, Map<String, List<URL>>> notified的Map<String, List<URL>>中
//  (2)进行properties设置和文件保存
//  (3)调用传入listener的notify()方法。
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if (urls == null) {
        throw new IllegalArgumentException("notify urls == null");
    }
    if (urls.size() == 0 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // 遍历List<URL> urls,将urls按照category进行分类。<category, 匹配的数据list>
    Map<String, List<URL>> result = new HashMap<String, List<URL>>(16);
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    // <category, List<URL>数据>
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>(16));
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList); // 填充到categoryNotified列表
        // 保存数据到本地磁盘
        saveProperties(url);
        // 通知
        listener.notify(categoryList);
    }
}

说明:这里传入的

  • url(overrideSubscribeUrl)provider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
  • listener:之前创建出来的overrideSubscribeListener实例
  • urls:[ empty://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=75315&side=provider&timestamp=1550989212512 ]

步骤:

  • 首先遍历List<URL> urls,将urls按照category进行分类,存储在Map<"categoryName", List<URL>> result中;
  • 然后根据urlConcurrentMap<URL, Map<String, List<URL>>> notified(<订阅的url,<category, 匹配的变更数据>>)中获取或创建Map<String, List<URL>> categoryNotified(<category, 匹配的变更数据>)
  • 最后遍历Map<"categoryName", List<URL>> result
    • 填充categoryNotified列表;
    • 保存传入的urlProperties properties(本地磁盘缓存)中
    • 调用传入的listenernotify方法(注意:这里调用的正是文章开头创建的overrideSubscribeListener实例的notify方法)

8.AbstractRegistry.saveProperties(URL url)

// 保存数据到本地磁盘
1.根据url从ConcurrentMap<URL, Map<String, List<URL>>> notified中获取Map<String, List<URL>> categoryNotified之后将所有category的list组成一串字符串buf(以空格分隔)
2.<serviceKey, buf>写入本地磁盘缓存中Properties properties
3.将AtomicLong lastCacheChanged加1
4.之后根据syncSaveFile判断是同步保存properties到文件还是异步保存properties到文件
// @param url 订阅url
private void saveProperties(URL url) {
    if (file == null) {
        return;
    }

    try {
        StringBuilder buf = new StringBuilder();
        Map<String, List<URL>> categoryNotified = notified.get(url);
        if (categoryNotified != null) {
            for (List<URL> us : categoryNotified.values()) {
                for (URL u : us) {
                    if (buf.length() > 0) {
                        buf.append(URL_SEPARATOR);
                    }
                    buf.append(u.toFullString());
                }
            }
        }
        // 服务关键字([group/]interface[:version]), 数据
        properties.setProperty(url.getServiceKey(), buf.toString());
        long version = lastCacheChanged.incrementAndGet();
        // 同步保存文件
        if (syncSaveFile) {
            doSaveProperties(version);
        } else {
            // 异步保存文件
            registryCacheExecutor.execute(new SaveProperties(version));
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

说明:

  • 入参urlprovider://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=73422&side=provider&timestamp=1550973009441
  • properties{ "com.alibaba.dubbo.demo.DemoService" -> "empty://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=75315&side=provider&timestamp=1550989212512"}
  • 最后采用异步线程将properties中的内容写入到文件中

9.AbstractRegistry$SaveProperties类

private class SaveProperties implements Runnable {
    private long version;

    private SaveProperties(long version) {
        this.version = version;
    }

    @Override
    public void run() {
        doSaveProperties(version);
    }
}

9.AbstractRegistry.doSaveProperties(long version)

// 将注册数据缓存保存到本地属性文件
// @param version 本次本地缓存变更版本
private void doSaveProperties(long version) {
    // 已经有其他线程执行了saveProperties(URL url),马上就要执行doSaveProperties(long version),所以当前线程放弃操作,让后边的这个线程来做保存操作。
    if (version < lastCacheChanged.get()) {
        return;
    }
    if (file == null) {
        return;
    }
    // 保存
    try {
        File lockFile = new File(file.getAbsolutePath() + ".lock");
        if (!lockFile.exists()) {
            // 创建lock文件
            lockFile.createNewFile();
        }
        RandomAccessFile raf = new RandomAccessFile(lockFile, "rw");
        try {
            FileChannel channel = raf.getChannel();
            try {
                FileLock lock = channel.tryLock();
                if (lock == null) {
                    throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
                }
                // 保存
                try {
                    if (!file.exists()) {
                        file.createNewFile();
                    }
                    FileOutputStream outputFile = new FileOutputStream(file);
                    try {
                        properties.store(outputFile, "Dubbo Registry Cache");
                    } finally {
                        outputFile.close();
                    }
                } finally {
                    lock.release();
                }
            } finally {
                channel.close();
            }
        } finally {
            raf.close();
        }
    } catch (Throwable e) {
        if (version < lastCacheChanged.get()) {
            return;
        } else {
            registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
        }
        logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e);
    }
}

这里有一个version,实际上是一个CAS判断,在saveProperties(URL url)方法中执行了long version = lastCacheChanged.incrementAndGet();之后,在doSaveProperties(long version)进行if (version < lastCacheChanged.get())判断,如果满足这个条件,说明当前线程在进行doSaveProperties(long version)时,已经有其他线程执行了saveProperties(URL url),马上就要执行doSaveProperties(long version),所以当前线程放弃操作,让后边的这个线程来做保存操作。

保存操作执行之后,会在文件夹/Users/Jay/.dubbo下生成两个文件:

  • dubbo-registry-demo-provider-localhost:2181.cache
  • dubbo-registry-demo-provider-localhost:2181.cache.lock

前者的内容:

#Dubbo Registry Cache
#Sun Feb 24 15:07:47 CST 2019
com.alibaba.dubbo.demo.DemoService=empty\://172.16.132.166\:20881/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category\=configurators&cellinvokemode\=sharing&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=75315&side\=provider&timestamp\=1550989212512

最后就是调用OverrideListenernotify方法:

10.OverrideListener.notify(List<URL> urls)

// 如果提供者url变更,则重新export
 1.protocol中的exporter destroy问题
 2.要求RegistryProtocol返回的exporter可以正常destroy
 3.notify后不需要重新向注册中心注册
 4.export 方法传入的invoker最好能一直作为exporter的invoker.
private class OverrideListener implements NotifyListener {
    // 订阅服务提供者的动态配置数据(订阅条件)
    private final URL subscribeUrl;
    // DelegateProviderMetaDataInvoker实例(包装AbstractProxyInvoker实例)
    private final Invoker originInvoker;

    public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
        this.subscribeUrl = subscribeUrl;
        this.originInvoker = originalInvoker;
    }

    // 对原本注册了的providerUrl进行校验,如果url发生了变化,那么要重新export
    // @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
    @Override
    public synchronized void notify(List<URL> urls) {
        if (logger.isDebugEnabled()) {
            logger.debug("original override urls: " + urls);
        }
        // 根据订阅条件,获取匹配的覆盖配置数据
        List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
        if (logger.isDebugEnabled()) {
            logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
        }
        // 没有匹配的
        if (matchedUrls.isEmpty()) {
            return;
        }

        // 覆盖规则url转为Configurator列表  本次调用,这里是一个空列表
        List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);
		// DelegateProviderMetaDataInvoker实例(包装AbstractProxyInvoker实例)
        final Invoker<?> invoker; 
        if (originInvoker instanceof InvokerDelegete) {
            invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
        } else {
            invoker = originInvoker;
        }
        // 最原始的invoker
        // 原来的提供者URL:
        // dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=172.16.132.166&bind.port=20881&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76006&side=provider&timestamp=1550992480278
        URL originUrl = getProviderUrl(invoker);
        // key: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=172.16.132.166&bind.port=20881&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76006&side=provider&timestamp=1550992480278
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<?> exporter = bounds.get(key); // 在doLocalExport方法中已经存放在这里了
        if (exporter == null) {
            logger.warn(new IllegalStateException("error state, exporter should not be null"));
            return;
        }
        // 当前的提供者url,可能经过了多次merge。
        // currentUrl: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=172.16.132.166&bind.port=20881&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76006&side=provider&timestamp=1550992480278
        URL currentUrl = exporter.getInvoker().getUrl(); 
        // 与本次配置merge之后的url
        // newUrl: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=172.16.132.166&bind.port=20881&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=76006&side=provider&timestamp=1550992480278
        URL newUrl = getConfiguredInvokerUrl(configurators, originUrl);
        if (!currentUrl.equals(newUrl)) { 
            // 如果不相等,重新export
            doChangeLocalExport(originInvoker, newUrl);
            logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl
                    + ", new export url: " + newUrl);
        }
    }

    // 根据订阅条件,获取匹配的覆盖配置数据
    // @param configuratorUrls 覆盖配置数据
    // @param currentSubscribe 订阅条件
    private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
        List<URL> result = new ArrayList<URL>();
        for (URL url : configuratorUrls) {
            URL overrideUrl = url;
            // 兼容旧版本
            if (url.getParameter(Constants.CATEGORY_KEY) == null
                    && Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
                overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
            }

            // 检查订阅条件和配置数据是否匹配,即是不是要应用到当前服务上
            if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
                result.add(url);
            }
        }
        return result;
    }

    // 合并配置的url
    private URL getConfiguredInvokerUrl(List<Configurator> configurators, URL url) {
        for (Configurator configurator : configurators) {
            url = configurator.configure(url);
        }
        return url;
    }
}

最后总结一下:

当前的provider订阅了/dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators,当其下的子节点发生变化(url)时,通知到provider,使得providerUrl发生变化,则提供者需要重新暴露

重新暴露:

// 对修改了url的invoker重新export。
// @param originInvoker 原始的服务提供者执行体 DelegateProviderMetaDataInvoker实例
// @param newInvokerUrl 新的服务提供者url dubbo://....
private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
    String key = getCacheKey(originInvoker); // dubbo://....
    final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
    if (exporter == null) {
        logger.warn(new IllegalStateException("error state, exporter should not be null"));
    } else {
        // originInvoker不变
        final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
        // protocol.export(invokerDelegete) 重新export
        exporter.setExporter(protocol.export(invokerDelegete));
    }
}