Dubbo 服务引用之构建客户端源码解析

消费者初始化全过程解析

Posted by Jay on March 3, 2019

Dubbo 服务引用之构建客户端源码解析

准备工作:

启动一个提供者provider:

dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider&timestamp=1551525684453

看一下ReferenceBean的继承实现关系图:

在执行DemoService demoService = (DemoService) context.getBean("demoService")时,由于ReferenceBean是一个FactoryBean,所以这里会通过FactoryBean.getObject方法获取Bean

看一下ReferenceBean的核心代码:

// ReferenceBean.getObject()方法,获取服务代理实例
public Object getObject() throws Exception {
    return get();
}

// ReferenceConfig.get()方法,获取服务代理实例
public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

// ReferenceConfig.init()方法,初始化
private void init() {
    ...
    ref = createProxy(map); // 创建代理
}

// ReferenceConfig.createProxy()方法,创建服务代理
private T createProxy(Map<String, String> map) {
    ...
    if (urls.size() == 1) {
        // get(0) 解码后:
        // registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=18793&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18793&register.ip=172.16.132.166&side=consumer&timestamp=1551526492998&registry=zookeeper&timestamp=1551526495185
        invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 服务引用 ###1
    } 
    ...
    // 创建服务代理
    return (T) proxyFactory.getProxy(invoker); // ###2
}

如上标注的1、2是最核心的两行代码。

一、使用Protocol将interfaceClass转化为Invoker

invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 服务引用 ###1

这里的refprotocolProtocol$Adaptive实例。

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    ...
    public com.alibaba.dubbo.rpc.Invoker refer(Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null)
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
    ...
}

这里extName="registry"。之后经过ProtocolListenerWrapper.refer->ProtocolFilterWrapper.refer->RegistryProtocol.refer,前两步什么都不做(registry协议)。来看RegistryProtocol.refer方法核心代码:

// 引用远程服务
// url: registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=18793&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18793&register.ip=172.16.132.166&side=consumer&timestamp=1551526492998&registry=zookeeper&timestamp=1551526495185
// type: interface com.alibaba.dubbo.demo.DemoService
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // url: zookeeper://....
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // 连接ZK注册中心
    Registry registry = registryFactory.getRegistry(url);
	...
    // 引用服务
    return doRefer(cluster, registry, type, url);
}

参数:

  • url:registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=18793&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18793&register.ip=172.16.132.166&side=consumer&timestamp=1551526492998&registry=zookeeper&timestamp=1551526495185
  • type: interface com.alibaba.dubbo.demo.DemoService

第一行代码执行完成之后,替换了协议,此时的url为:

zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19009&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19009&register.ip=172.16.132.166&side=consumer&timestamp=1551527300192&timestamp=1551527302398

之后开始获取Registry。这里的registryFactoryRegistryFactory$Adaptive实例。

public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
    public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
        if (arg0 == null)
            throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        // zookeeper
        String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); 
        if(extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
        return extension.getRegistry(arg0);
    }
}

这里的extNamezookeeper。之后执行ZookeeperRegistryFactory的父类AbstractRegistryFactory.getRegistry方法,如下:

// url 注册中心地址,不允许为空 zookeeper://....
@Override
public Registry getRegistry(URL url) {
    url = url.setPath(RegistryService.class.getName())
            .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    // 注册中心url字符串表示
    String key = url.toServiceString();
    // 锁定注册中心获取过程,保证注册中心单一实例
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        // ZookeeperRegistry
        registry = createRegistry(url); // 创建ZookeeperRegistry实例
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // 释放锁
        LOCK.unlock();
    }
}

经过处理的url为:

zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&interface=com.alibaba.dubbo.registry.RegistryService&pid=19009&timestamp=1551527302398

之后调用ZookeeperRegistryFactory.createRegistry(URL url):

// 连接Zookeeper注册中心
// url zookeeper://...
@Override
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

这里的zookeeperTransporterZookeeperTransporter$Adaptive实例。

// url 注册中心url zookeeper://....
// zookeeperTransporter 用于连接zk,ZookeeperTransporter$Adaptive实例
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // ZK根节点
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 在这里连接zk
    zkClient = zookeeperTransporter.connect(url);
     // 添加zk连接状态变化监听器
    zkClient.addStateListener(new StateListener() {
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    recover(); // 恢复注册与订阅
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

通过super(url)这句代码,调用了ZookeeperRegistry的父类FailbackRegistry的构造器(启动失败重试定时任务:注册失败/注销失败/订阅失败/反订阅失败/通知失败)和AbstractRegistry的构造器(将信息写入properties文件,进行相应的通知——这里没有订阅url及通知监听器,所以没做什么事)。

然后连接Zookeeper注册中心,获取ZookeeperClient客户端(ZkclientZookeeperClient实例),最后添加ZK连接状态变化监听器(在重新连接ZK成功后,恢复注册与订阅)。

执行zookeeperTransporter.connect(url),该方法中的extNamezkclient。之后执行ZkclientZookeeperTransporter.connect:

// 使用Zkclient连接至zookeeper,返回zkclient zk客户端
// url:注册中心url zookeeper://....
// 返回ZkclientZookeeperClient
@Override
public ZookeeperClient connect(URL url) {
    return new ZkclientZookeeperClient(url);
}
// 使用Zkclient连接至zookeeper
// url: 注册中心url zookeeper://...
ZkclientZookeeperClient(URL url) {
    super(url);
    // 创建连接zk的任务ListenableFutureTask,返回ZkClientWrapper实例
    client = new ZkClientWrapper(url.getBackupAddress(), 30000);
     // 给创建的ListenableFutureTask添加监听器,任务完成即在使用ZkClient连接zookeeper之后,添加zk连接状态变更监听器,监听连接断开/连接成功/重新连接成功事件
    // (实际上这里只有重新连接成功事件会被处理,而处理器实际上就是ZookeeperRegistry构造器中的那个执行recover()的StateListener)
    client.addListener(new IZkStateListener() {
        @Override
        public void handleStateChanged(KeeperState state) throws Exception {
            ZkclientZookeeperClient.this.state = state;

            if (state == KeeperState.Disconnected) {
                stateChanged(StateListener.DISCONNECTED);
            } else if (state == KeeperState.SyncConnected) {
                stateChanged(StateListener.CONNECTED);
            }
        }

        @Override
        public void handleNewSession() throws Exception {
            stateChanged(StateListener.RECONNECTED);
        }
    });
    client.start(); // 启动连接zk的任务,返回ZkClient客户端实例
}

此处的clientZkClientWrapper实例,来看ZkClientWrapper.start():

 private ListenableFutureTask<ZkClient> listenableFutureTask;

// 构造器——new ZkClientWrapper(url.getBackupAddress(), 30000)
// 创建连接zk的任务
// serverAddr zk集群地址
// timeout 连接超时设置
public ZkClientWrapper(final String serverAddr, long timeout) {
    this.timeout = timeout; // 连接超时设置
    // 创建连接zk的任务
    listenableFutureTask = ListenableFutureTask.create(new Callable<ZkClient>() {
        @Override
        public ZkClient call() throws Exception {
            // 连接zk
            return new ZkClient(serverAddr, Integer.MAX_VALUE);
        }
    });
}

// 给创建的ListenableFutureTask添加监听器,任务完成即在ZkClient连接zookeeper之后,添加zk连接状态监听器
// listener zk连接状态监听器
public void addListener(final IZkStateListener listener) {
    listenableFutureTask.addListener(new Runnable() {
        @Override
        public void run() {
            try {
                client = listenableFutureTask.get(); // 获取任务执行结果,即ZkClient实例
                client.subscribeStateChanges(listener); // 添加zk连接状态变化监听器
            } catch (InterruptedException e) {
                logger.warn(Thread.currentThread().getName() + " was interrupted unexpectedly, which may cause unpredictable exception!");
            } catch (ExecutionException e) {
                logger.error("Got an exception when trying to create zkclient instance, can not connect to zookeeper server, please check!", e);
            }
        }
    });
}

// 连接zk,返回ZkClient实例
public void start() {
    if (!started) {
        // 另起线程
        Thread connectThread = new Thread(listenableFutureTask);
        connectThread.setName("DubboZkclientConnector");
        connectThread.setDaemon(true);
        connectThread.start();
        try {
            // 获取执行结果
            client = listenableFutureTask.get(timeout, TimeUnit.MILLISECONDS);
        } catch (Throwable t) {
            // 超时,抛出超时异常
            logger.error("Timeout! zookeeper server can not be connected in : " + timeout + "ms!", t);
        }
        started = true;
    } else {
        logger.warn("Zkclient has already been started!");
    }
}

此处会连接Zookeeper,返回ZkClient实例。

之后添加ZK连接状态变化监听器。到此为止,创建Registry就完成了。再回到RegistryProtocol.refer方法核心代码:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // url: zookeeper://....
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY))
            .removeParameter(Constants.REGISTRY_KEY);
    // 连接ZK注册中心
    Registry registry = registryFactory.getRegistry(url);
	...
    // 引用服务
    return doRefer(cluster, registry, type, url);
}

之后执行最后一行代码:

// cluster: Cluster@Adaptive;registry: ZookeeperRegistry;type: interface com.alibaba.dubbo.demo.DemoService
// url: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19549&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&register.ip=172.16.132.166&side=consumer&timestamp=1551529753690&timestamp=1551529755848
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 注册目录
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    directory.setRegistry(registry); // 设置注册中心
    directory.setProtocol(protocol); // 设置协议
    // REFER_KEY的所有属性
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // 临时的url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&side=consumer&timestamp=1551529753690
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
            && url.getParameter(Constants.REGISTER_KEY, true)) {
        // 将服务消费者配置注册到服务注册中心
        // 消费者注册的url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&side=consumer&timestamp=1551529753690
        registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
    }
    // 消费者订阅url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers,cells&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&side=consumer&timestamp=1551529753690
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));
    // MockClusterInvoker实例
    Invoker<T> invoker = cluster.join(directory);
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}

总体步骤:

  • 首先创建RegistryDirectory实例;
  • 之后向ZK注册消费者url
  • 然后订阅、监听(此处发生了第一次服务发现/长连接的建立/Netty客户端的建立);
  • 最后将RegistryDirectory实例伪装成一个MockClusterInvoker实例。

首先是创建RegistryDirectory,创建完成的实例:

-->List<Router> routers: [MockInvokersSelector实例]
-->Registry registry: 上述的ZookeeperRegistry实例zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&interface=com.alibaba.dubbo.registry.RegistryService&pid=19549&timestamp=1551529755848)
-->String serviceKey: dubbo_test/com.alibaba.dubbo.registry.RegistryService
-->String[] serviceMethods: [sayHello]
-->Class<T> serviceType: interface com.alibaba.dubbo.demo.DemoService
-->URL url: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19549&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&register.ip=172.16.132.166&side=consumer&timestamp=1551529753690&timestamp=1551529755848
-->URL consumerUrl: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&client=zkclient&dubbo=2.0.0&group=dubbo_test&pid=19549&refer=application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&register.ip=172.16.132.166&side=consumer&timestamp=1551529753690&timestamp=1551529755848
-->URL directoryUrl: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&register.ip=172.16.132.166&side=consumer&timestamp=1551529753690
-->URL overrideDirectoryUrl: zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19549&register.ip=172.16.132.166&side=consumer&timestamp=1551529753690
-->Map<String, String> queryMap: {side=consumer, application=demo-consumer, register.ip=172.16.132.166, methods=sayHello, dubbo=2.0.0, pid=19549, check=false, interface=com.alibaba.dubbo.demo.DemoService, timestamp=1510225913509}

其中List<Router> routers是在RegistryDirectory的父类AbstractDirectory中创建的,代码如下:

public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    this.url = url;
    this.consumerUrl = consumerUrl;
    setRouters(routers);
}

protected void setRouters(List<Router> routers) {
    // copy list
    routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
    // append url router
    String routerKey = url.getParameter(Constants.ROUTER_KEY);
    if (routerKey != null && routerKey.length() > 0) {
        RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerKey);
        routers.add(routerFactory.getRouter(url));
    }
    // append mock invoker selector
    routers.add(new MockInvokersSelector());
    Collections.sort(routers);
    this.routers = routers;
}

之后向注册中心注册消费者,注册的方式与服务提供者一样。先是通过FailbackRegistry.register,内部调用子类ZookeeperRegistrydoRegister(),如果注册失败,加入注册失败列表(会被后台失败重试定时任务重新注册)。

// url--消费者url
public void register(URL url) {
    if (destroyed.get()){
        return;
    }
    super.register(url);
    failedRegistered.remove(url);
    failedUnregistered.remove(url);
    try {
        // 向服务器端发送注册请求
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // 如果开启了启动时检测,则直接抛出异常
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        // 跳过失败重试
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // 将失败的注册请求记录到失败列表,定时重试
        failedRegistered.add(url);
    }
}

最后来看ZookeeperRegistrydoRegister方法:

// url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
@Override
protected void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

在zk上创建临时节点(toUrlPath(url)):

/dubbo_test/com.alibaba.dubbo.demo.DemoService/consumers/consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=consumers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219

到此,消费者注册完成!之后directory.subscribe进行订阅。RegistryDirectory.subscribe(URL url):

// 服务消费者订阅
// url : consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers,cells&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
public void subscribe(URL url) {
    setConsumerUrl(url);
    // 本RegistryDirectory实例作为监听器,这里调用FailbackRegistry.subscribe
    registry.subscribe(url, this); 
}

FailbackRegistry.subscribe(URL url, NotifyListener listener)核心代码:

public void subscribe(URL url, NotifyListener listener) {
    if (destroyed.get()){
        return;
    }
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // 向服务器端发送订阅请求
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;
        // 根据订阅的url获取缓存的数据
        List<URL> urls = getCacheUrls(url);
        if (urls != null && urls.size() > 0) {
            // 发起通知
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // 如果开启了启动时检测,则直接抛出异常
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }
        }

        // 将失败的订阅请求记录到失败列表,定时重试
        addFailedSubscribed(url, listener);
    }
}

ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)

// 订阅url: consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
// listener: RegistryDirectory实例
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            ...
        } else {
            // 执行到这里!!!
            List<URL> urls = new ArrayList<URL>();
            for (String path : toCategoriesPath(url)) {
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

这里的for循环是3次:

  • /dubbo_test/com.alibaba.dubbo.demo.DemoService/providers
  • /dubbo_test/com.alibaba.dubbo.demo.DemoService/configurators
  • /dubbo_test/com.alibaba.dubbo.demo.DemoService/routers

执行完上述for循环后,来看此时的:

ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners

{
consumer://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=providers,configurators,routers&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
=
{RegistryDirectory实例=ZookeeperRegistry中的匿名内部类ChildListener实例}
}

List<URL> urls:(3个元素)

[
dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider&timestamp=1551525684453, , 

empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219, 

empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
]

注意:第一个元素是在执行List<String> children = zkClient.addChildListener(path, zkListener)代码时,会返回当前path下的节点(实际上就是第一次服务发现)。

之后一路执行到AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls):

// 根据订阅url,找出匹配的变更数据urls,触发对应的监听器
// @param url 订阅的url
// @param listener 订阅的url对应的通知监听器
// @param urls 变更后,目前的数据
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if (urls == null) {
        throw new IllegalArgumentException("notify urls == null");
    }
    if (urls.size() == 0 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // <category, 匹配的变更数据list>
    Map<String, List<URL>> result = new HashMap<String, List<URL>>(16);
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
            List<URL> categoryList = result.get(category);
            if (categoryList == null) {
                categoryList = new ArrayList<URL>();
                result.put(category, categoryList);
            }
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    // <category, 匹配的变更数据>
    Map<String, List<URL>> categoryNotified = notified.get(url);
    if (categoryNotified == null) {
        notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>(16));
        categoryNotified = notified.get(url);
    }
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        // 保存数据到本地磁盘
        saveProperties(url);
        // 通知变更
        listener.notify(categoryList); // !!!这里通知RegistryDirectory监听器
    }
}

首先是一个for循环对传入的url列表进行分类,分类结果如下:

Map<String, List<URL>> result

{
configurators=[
empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=configurators&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
], 

routers=[
empty://172.16.132.166/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&category=routers&cellinvokemode=sharing&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=19878&side=consumer&timestamp=1551531392219
], 

providers=[
dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider&timestamp=1551525684453
]
}

之后执行第二个for循环,对上述的result进行遍历,分别进行保存文件和通知。其中前两个entry没做什么核心事,直接来看providers的entry的通知。代码RegistryDirectory.notify(List<URL> urls)。这里的urls就是上边的providers值。

// 服务数据变更是全量通知
public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol(); // 协议
        String category = url.getParameter(Constants.CATEGORY_KEY,  Constants.DEFAULT_CATEGORY); // 类型
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: "
                    + url + " from registry " + getUrl().getAddress() + " to consumer "
                    + NetUtils.getLocalHost());
        }
    }
    // configurators
    if (configuratorUrls != null && configuratorUrls.size() > 0) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && routerUrls.size() > 0) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override参数
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    refreshInvoker(invokerUrls);
}

这里首先将传入的providerurl存放在invokerUrls列表中,之后调用refreshInvoker(invokerUrls)

// Map<消费者引用服务url, RegistryDirectory.InvokerDelegete> 
private volatile Map<String, Invoker<T>> urlInvokerMap; 
// Map<methodName, List<RegistryDirectory.InvokerDelegete>> 
private volatile Map<String, List<Invoker<T>>> methodInvokerMap; 

// 刷新invoker map
private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
        && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // 禁止访问
        this.methodInvokerMap = null; // 置空列表
        destroyAllInvokers(); // 关闭所有Invoker
    } else {
        this.forbidden = false; // 允许访问
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls); // 缓存invokerUrls列表,便于交叉对比
        }
        if (invokerUrls.size() == 0) {
            return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 将方法名映射为Invoker列表
        // state change
        //如果计算错误,则不进行处理.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        // 多分组
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}
// 将urls转成invokers
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.size() == 0) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        // 如果reference端配置了protocol,则只选择匹配的protocol
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            // 不匹配的提供者url,跳到下一个
            if (!accept) {
                continue;
            }
        }
        if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        // 判断提供者url协议是否支持
        if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                                                   + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        // providerUrl: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&cellinvokemode=sharing&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=18594&side=provider&timestamp=1551525684453
        // url: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455&register.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer&timestamp=1551534134492
        URL url = mergeUrl(providerUrl); // 合并消费者等参数到提供者url

        // key: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455&register.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer&timestamp=1551534134492
        String key = url.toFullString(); // URL参数是排序的(这个key是配置合并之后的url字符串表示)
        if (keys.contains(key)) { // 重复URL
            continue;
        }
        keys.add(key);
        // 缓存的key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        if (invoker == null) { // 缓存中没有,重新refer
            try {
                boolean enabled = true; // enabled参数控制是否引用
                if (url.hasParameter(Constants.DISABLED_KEY)) {
                    enabled = !url.getParameter(Constants.DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    // serviceType: Class<com.alibaba.dubbo.demo.DemoService>
                    invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if (invoker != null) { // 将新的引用放入缓存
                newUrlInvokerMap.put(key, invoker);
            }
        } else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

这里会遍历urls(providerUrls):protocolProtocol$Adaptive实例,依旧是走ProtocolListenerWrapper->ProtocolFilterWrapper->DubboProtocol,看一下ProtocolFilterWrapper部分:

public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // registry协议
    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
        return protocol.refer(type, url);
    }
    // protocol.refer(type, url): DubboInvoker实例
    return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY,
            Constants.CONSUMER);
}

两个常量是:reference.filterconsumer。最后来看DubboProtocol.refer:

// 引用服务
// @param serviceType 业务接口 com.alibaba.dubbo.demo.DemoService
// @param url  远程服务的URL地址(合并消费者参数之后的提供者url): dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455&register.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer&timestamp=1551534134492
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}

这里首先执行getClients创建Netty客户端,创建客户端与服务端的长连接,之后封装为DubboInvoker,最后返回。返回之后进行Filter链包装该DubboInvoker实例,再使用ListenerInvokerWrapper包装带有Filter链的DubboInvoker实例,最后又会使用InvokerDelegete包装ListenerInvokerWrapper实例。在最后,将该InvokerDelegete实例放置到newUrlInvokerMap缓存中,这就是整个toInvokers(List<URL> urls)的逻辑。最后再将newUrlInvokerMap转换封装到Map<String, List<Invoker<T>>> newMethodInvokerMap缓存中。这就是整个refreshInvoker(List<URL> invokerUrls)的逻辑。执行完成之后,订阅通知就执行完了。

来看一下getClients(url)

// 根据合并消费者参数之后的提供者url获取ExchangeClient客户端
private ExchangeClient[] getClients(URL url) {
    // 是否共享连接(TCP)
    boolean serviceShareConnect = false;
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // 如果connections不配置,则共享连接,否则每服务每连接 
    if (connections == 0) {
        serviceShareConnect = true;
        connections = 1;
    }

    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (serviceShareConnect) {
            // 共享
            clients[i] = getSharedClient(url);
        } else {
            clients[i] = initClient(url);
        }
    }
    return clients;
}

// 获取共享TCP连接
private ExchangeClient getSharedClient(URL url) {
    String key = url.getAddress(); // 提供者 ip:port
    ReferenceCountExchangeClient client = referenceClientMap.get(key);
    if (client != null) {
        if (!client.isClosed()) {
            client.incrementAndGetCount();
            return client;
        } else {
            referenceClientMap.remove(key);
        }
    }
    synchronized (key.intern()) { // 如果常量池中存在当前字符串key, 就会直接返回当前字符串. 如果常量池中没有此字符串, 会将此字符串放入常量池中后, 再返回
        ExchangeClient exchangeClient = initClient(url); // 初始化
        client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
        referenceClientMap.put(key, client);
        ghostClientMap.remove(key);
        return client;
    }
}

// 创建新TCP连接.
private ExchangeClient initClient(URL url) {
    // client type setting.---netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    // 协议编解码
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // 默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO存在严重性能问题,暂时不允许使用
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                               " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // 若设置连接应该是lazy的 
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 则建立懒连接client,直到请求的时候才建立TCP连接
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 否则直接建立TCP连接,返回HeaderExchangeClient实例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}

注意:这里由于使用了共享链接,实际上就是在一个消费者机器和一个服务提供者机器之间只建立一条TCP长连接,也可以指定连接数,那样就会建立多条连接。

最后执行到HeaderExchanger.connect(URL url, ExchangeHandler handler):

// 连接到一台服务器   Transporters.connect()连接到服务端,返回NettyClient实例
// @param url 合并消费者参数等之后的提供者url
// @param handler private final ExchangeHandler requestHandler = new ExchangeHandlerAdapter(){} 请求处理器
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    // Transporters.connect() --> NettyClient
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

执行Transporters.connect

// 连接到一台服务器,返回NettyClient
// @param url 合并消费者参数等之后的提供者url
// @param handlers DecodeHandler实例
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    ChannelHandler handler;
    if (handlers == null || handlers.length == 0) {
        handler = new ChannelHandlerAdapter();
    } else if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }
    return getTransporter().connect(url, handler);
}

执行NettyTransporter.connect:

// 连接到一台服务器
// @param url server url 合并消费者参数等之后的提供者url
// @param listener DecodeHandler实例
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}
// NettyClient初始化
// @param url 合并消费者参数等之后的提供者url
// @param handler DecodeHandler实例
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
    // wrapChannelHandler(url, handler) --> MultiMessageHandler实例
    super(url, wrapChannelHandler(url, handler));
}

protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
    url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
    url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
    // url: 合并消费者参数等之后的URL,加上线程池名和线程池类型key value
    return ChannelHandlers.wrap(handler, url);
}

public static ChannelHandler wrap(ChannelHandler handler, URL url) {
    return ChannelHandlers.getInstance().wrapInternal(handler, url);
}

protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
    return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url)));
}

这里继续包装handler。和provider一样,共6层,如下所示。

MultiMessageHandler
  -->HeartBeatHandler
	-->AllChannnelHandler
	  -->DecodeHandler
		-->HeaderExchangeHandler
		  -->ExchangeHandlerAdapter实例(DubboProtocol类中的匿名内部类)

之后调用父类AbstractClient的构造器:

// @param url 合并消费者参数等之后的提供者url(不包含线程池线程名和线程池类型key value)
// @param handler MultiMessageHandler实例
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    // 发送重新连接true/false  ??
    sendReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    // 连接关闭超时时间 1000 * 60 * 15
    shutdownTimeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT);

    // 默认重连间隔2s,1800表示1小时warning一次 即1800*2s=3600s=1hour
    reconnectWarningPeriod = url.getParameter("reconnect.warning.period", 1800);

    try {
        doOpen(); // 真正的初始化netty client
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    try {
        connect(); // netty client连接服务端netty server
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
        }
    } catch (RemotingException t) {
        if (url.getParameter(Constants.CHECK_KEY, true)) {
            close();
            throw t;
        } else {
            logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                    + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
        }
    } catch (Throwable t) {
        close();
        throw new RemotingException(url.toInetSocketAddress(), null,
                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
                        + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
    }
    // 获取线程池
    executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
    ExtensionLoader.getExtensionLoader(DataStore.class)
            .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}

再调用父类AbstractEndpointAbstractPeer的构造器:

// 编解码器 DubboCountCodec
private Codec2 codec;
// 远程服务调用超时时间
private int timeout;
// 连接超时时间
private int connectTimeout;

// @param url 合并消费者参数等之后的提供者url(不包含线程池线程名和线程池类型key value)
// @param handler MultiMessageHandler实例
public AbstractEndpoint(URL url, ChannelHandler handler) {
    super(url, handler);
    this.codec = getChannelCodec(url); // dubbo
    this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用超时,默认1s
    this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); // 连接超时,默认3s
}
private final ChannelHandler handler; // MultiMessageHandler实例

private volatile URL url; // 合并消费者参数之后的提供者url

// @param url 合并消费者参数等之后的提供者url(不包含线程池线程名和线程池类型key value)
// @param handler MultiMessageHandler实例
public AbstractPeer(URL url, ChannelHandler handler) {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    this.url = url;
    this.handler = handler;
}

在进行以上一系列的赋值后,打开Netty客户端:

protected void doOpen() throws Throwable {
    // 设置logger factory
    NettyHelper.setNettyLoggerFactory();
    bootstrap = new ClientBootstrap(channelFactory); // Netty客户端引导类
    // config 配置Channel TCP 属性
    // @see org.jboss.netty.channel.socket.SocketChannelConfig
    bootstrap.setOption("keepAlive", true); // 开启TCP心跳检测
    bootstrap.setOption("tcpNoDelay", true); // 关闭Nagle算法,实时性高
    bootstrap.setOption("connectTimeoutMillis", getTimeout()); // 连接超时
    // ChannelHandler 处理入站、出站事件
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            // 构造器参数: <DubboCountCodec,合并消费者参数之后的提供者url,ChannelHandler(NettClient实例)>
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);

            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder()); // 解码器
            pipeline.addLast("encoder", adapter.getEncoder()); // 编码器
            pipeline.addLast("handler", nettyHandler); // 客户端逻辑处理器,处理入站、出站事件
            return pipeline;
        }
    });
}

之后连接Netty服务端:

// 连接到服务器端
protected void connect() throws RemotingException {
    connectLock.lock();
    try {
        if (isConnected()) { // 是否已连接到服务端
            return;
        }
        // 定时检测连接状态,如果连接断开了,则重连(定时任务)
        initConnectStatusCheckCommand();
        // 连接服务端
        doConnect();
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
                    + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
                    + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successful connect to server " + getRemoteAddress()
                        + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
                        + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        reconnectCount.set(0); // 重连次数设置为0
        reconnectErrorLogFlag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
                + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
                + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        connectLock.unlock();
    }
}
// 连接服务端
protected void doConnect() throws Throwable {
    long start = System.currentTimeMillis(); // 连接开始时间
    // getConnectAddress()返回服务端地址
    ChannelFuture future = bootstrap.connect(getConnectAddress()); // 异步连接操作,返回的是一个future
    try {
        // 在指定的超时时间内,等待连接建立
        boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
        // 指定时间内建立连接成功
        if (ret && future.isSuccess()) {
            Channel newChannel = future.getChannel();
            newChannel.setInterestOps(Channel.OP_READ_WRITE); // 设置Channel感兴趣的事件是读写事件
            try {
                // 关闭旧的连接
                Channel oldChannel = NettyClient.this.channel; // copy reference
                if (oldChannel != null) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
                        }
                        oldChannel.close();
                    } finally {
                        // 移除channel缓存
                        NettyChannel.removeChannelIfDisconnected(oldChannel);
                    }
                }
            } finally {
                // 客户端是否关闭
                if (NettyClient.this.isClosed()) {
                    try {
                        if (logger.isInfoEnabled()) {
                            logger.info("Close new netty channel " + newChannel + ", because the client closed.");
                        }
                        newChannel.close();
                    } finally {
                        NettyClient.this.channel = null;
                        NettyChannel.removeChannelIfDisconnected(newChannel);
                    }
                } else {
                    // 这里成员变量channel进行复制
                    NettyClient.this.channel = newChannel;
                }
            }
        } else if (future.getCause() != null) {
            // 建立连接失败,发生了异常
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "+ getRemoteAddress() + ", error message is: " + future.getCause().getMessage(), future.getCause());
        } else {
            // 在建立连接过程中超时了
            throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
                    + getRemoteAddress() + " client-side timeout "
                    + getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
        }
    } finally {
        if (!isConnected()) {
            // 取消连接
            future.cancel();
        }
    }
}

到此为止NettyClient就创建好了,之后将该client封装为HeaderExchangeClient中。

// 心跳线程
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
// NettyClient实例
private final Client client;
// HeaderExchangeChannel实例
private final ExchangeChannel channel;
// 心跳定时器
private ScheduledFuture<?> heatbeatTimer;
// 心跳超时,毫秒。缺省0,不会执行心跳。
private int heartbeat; // 心跳间隔
private int heartbeatTimeout; // 心跳超时

// client: NettyClient实例, needHeartbeat: true
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
    if (client == null) {
        throw new IllegalArgumentException("client == null");
    }
    this.client = client; // NettyClient实例
    this.channel = new HeaderExchangeChannel(client);
    String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); // dubbo协议版本
    this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
    this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
    if (heartbeatTimeout < heartbeat * 2) {
        throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
    }
    if (needHeartbeat) {
        // 开启心跳检测
        startHeatbeatTimer();
    }
}

启动心跳。

最后将HeaderExchangeClient实例封装为ReferenceCountExchangeClient

// @param client HeaderExchangeClient实例
// @param ghostClientMap
public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
    this.client = client;
    refenceCount.incrementAndGet();
    this.url = client.getUrl();
    if (ghostClientMap == null) {
        throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
    }
    this.ghostClientMap = ghostClientMap;
}

最后放到缓存Map<String, ReferenceCountExchangeClient> referenceClientMap中。最后将ReferenceCountExchangeClient封装到DubboInvoker中。来看此时的DubboInvoker

-->Map<String, String> attachment: {interface=com.alibaba.dubbo.demo.DemoService}
-->ExchangeClient[] clients:[ReferenceCountExchangeClient实例] // 如果设置了多条连接,此处有多个client
-->Class<T> type: interface com.alibaba.dubbo.demo.DemoService
-->Url url: dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=20455&register.ip=172.16.132.166&remote.timestamp=1551525684453&side=consumer&timestamp=1551534134492

之后对DubboInvoker实例进行Filter链的包装:

ConsumerContextFilter->FutureFilter->MonitorFilter->DubboInvoker.

然后将包装后的Invoker实例包装为ListenerInvokerWrapper实例,最后再将ListenerInvokerWrapper实例包装为InvokerDelegete实例。最后的最后,终极目的:初始化RegistryDirectory的两个属性:

Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[RegistryDirectory$InvokerDelegete实例], *=[RegistryDirectory$InvokerDelegete实例]}

Map<String, Invoker<T>> urlInvokerMap={dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=21870&register.ip=192.168.0.101&remote.timestamp=1551540659260&side=consumer&timestamp=1551541271221
=RegistryDirectory$InvokerDelegete实例}

到此为止,订阅就完成了。现在来看RegistryProtocol.doRefer的最后一行代码:return cluster.join(directory):

这里的clusterCluster$Adaptive实例:

public class Cluster$Adaptive implements com.alibaba.dubbo.rpc.cluster.Cluster {
    public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("cluster", "failover"); // failover
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])");
        com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
        return extension.join(arg0);
    }
}

这里的extName="failover",这里会进行AOP:MockClusterWrapper包装FailoverCluster

public class MockClusterWrapper implements Cluster {

    private Cluster cluster; // FailoverCluster实例

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

这里的clusterFailoverCluster实例。

// 失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。
public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }

}

public FailoverClusterInvoker(Directory<T> directory) {
    super(directory);
}

这里实际上就是创建一个FailoverClusterInvoker实例,通过其父类AbstractClusterInvoker存储属性。

最后创建一个MockClusterInvoker实例:

private final Directory<T> directory; // RegistryDirectory实例

private final Invoker<T> invoker; // FailoverClusterInvoker实例

public MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) {
    this.directory = directory;
    this.invoker = invoker;
}

到此为止,下边的第一行代码就结束了。最终得到一个MockClusterInvoker实例:

  • directory=RegistryDirectory实例:
  • invoker=FailoverClusterInvokers实例(该实例中又包含一个Directory<T> directory属性,值为上述的RegistryDirectory实例)
private T createProxy(Map<String, String> map) {
    ...
    if (urls.size() == 1) {
        invoker = refprotocol.refer(interfaceClass, urls.get(0)); // 引用服务 ###1
    } 
    ...
    // 创建服务代理
    return (T) proxyFactory.getProxy(invoker); // ###2
}

二、使用ProxyFactory创建代理

(T) proxyFactory.getProxy(invoker);

上述的proxyFactoryProxyFactory$Adaptive实例,调用getProxy(Invoker<T> invoker)时内部最终得到是一个被StubProxyFactoryWrapper包装后的JavassistProxyFactory。调用JavassistProxyFactory.getProxy(Invoker<T> invoker)时,先调用父类AbstractProxyFactory.getProxy(Invoker<T> invoker)方法,再调用JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] types)方法。

public abstract class AbstractProxyFactory implements ProxyFactory {

    public <T> T getProxy(Invoker<T> invoker) throws RpcException {
        Class<?>[] interfaces = null; // 接口处理(服务代理类实现的接口)
        String config = invoker.getUrl().getParameter("interfaces");
        if (config != null && config.length() > 0) {
            String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
            if (types != null && types.length > 0) {
                interfaces = new Class<?>[types.length + 2];
                interfaces[0] = invoker.getInterface(); // 业务接口
                interfaces[1] = EchoService.class; // 回声测试接口
                for (int i = 0; i < types.length; i++) {
                    interfaces[i + 1] = ReflectUtils.forName(types[i]);
                }
            }
        }
        if (interfaces == null) { // 业务接口 + 回声测试接口
            interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
        }
        return getProxy(invoker, interfaces);
    }

    // 服务代理实例
    // @param invoker 调用执行体
    // @param types 接口类型
    // @param <T> 业务接口
    public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);

}
// JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] types)
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
  • invoker:MockClusterInvoker实例
  • interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]

注意这里的Proxy不是jdk的,而是Dubbo的。

Proxy.getProxy(interfaces):

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    ...
    Proxy proxy = null;
    ...
    // create ProxyInstance class. // 服务代理类
    String pcn = pkg + ".proxy" + id; // 类名 com.alibaba.dubbo.common.bytecode.proxy0
    ccp.setClassName(pcn);
    ccp.addField("public static java.lang.reflect.Method[] methods;");
    ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
    ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
    ccp.addDefaultConstructor();
    Class<?> clazz = ccp.toClass();
    clazz.getField("methods").set(null, methods.toArray(new Method[0]));

    // create Proxy class. 工厂类,用于生成服务代理类实例(newInstance()方法)
    String fcn = Proxy.class.getName() + id; // com.alibaba.dubbo.common.bytecode.Proxy0
    ccm = ClassGenerator.newInstance(cl);
    ccm.setClassName(fcn);
    ccm.addDefaultConstructor();
    ccm.setSuperClass(Proxy.class);
    ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
    Class<?> pc = ccm.toClass();
    proxy = (Proxy) pc.newInstance(); 
    ...
    return proxy; // 返回工厂类实例 
}

从代码来看,会生成两个Class对象:pc是创建服务代理类实例的工厂类;clazz是服务代理类。最终返回的proxy是如下com.alibaba.dubbo.common.bytecode.Proxy0类对象;之后调用了com.alibaba.dubbo.common.bytecode.Proxy0.newInstance(InvocationHandler invocationHandler)方法创建出了com.alibaba.dubbo.common.bytecode.proxy0类对象,并初始化了其中的InvocationHandler handler对象为InvokerInvocationHandler实例。

最终会生成两个类:

// com.alibaba.dubbo.common.bytecode.Proxy0 工厂类
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.Proxy;
import com.alibaba.dubbo.common.bytecode.proxy0;
import java.lang.reflect.InvocationHandler;

public class Proxy0 extends Proxy implements ClassGenerator.DC {
    @Override
    public Object newInstance(InvocationHandler invocationHandler) {
        return new proxy0(invocationHandler); // 创建服务代理类实例
    }
}

// com.alibaba.dubbo.common.bytecode.proxy0 服务代理类
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.rpc.service.EchoService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

public class proxy0 implements ClassGenerator.DC, EchoService, DemoService {
    public static Method[] methods;
    private InvocationHandler handler; // InvokerInvocationHandler实例

    public String sayHello(String string) {
        Object[] arrobject = new Object[]{string};
        Object object = this.handler.invoke(this, methods[0], arrobject);
        return (String)object;
    }

    public Object $echo(Object object) {
        Object[] arrobject = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[1], arrobject);
        return object2;
    }

    public proxy0() {
    }

    public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler;
    }
}

上边的Method[] methods数组实际上已经包含了两个元素:

[public abstract java.lang.String com.alibaba.dubbo.demo.DemoService.sayHello(java.lang.String),

public abstract java.lang.Object com.alibaba.dubbo.rpc.service.EchoService.$echo(java.lang.Object)]

如上所示,最终返回的服务代理对象其实是一个proxy0对象,当调用其sayHello方法时,其调用内部的handler.invoke方法。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker; // MockClusterInvoker实例

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    // 返回调用结果
    // @param proxy  服务代理类(com.alibaba.dubbo.common.bytecode.proxy0实例)
    // @param method 调用的方法
    // @param args 方法参数
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // Object方法,直接调用
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // RPC调用出发点,构建Invocation
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

这里的invoker是上述的MockClusterInvoker实例。

到此为止,DemoService demoService = (DemoService) context.getBean("demoService"); 该行代码就结束了。最终得到的demoService是一个com.alibaba.dubbo.common.bytecode.proxy0实例(是一个代理)。