Dubbo 服务引用之构建客户端总体流程

ReferenceBean/ReferenceConfig

Posted by Jay on March 3, 2019

Dubbo 服务引用之构建客户端总体流程

一、示例

1.配置文件
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd">

    <!-- 消费方应用名,用于计算依赖关系,不是匹配条件,不要与提供方一样 -->
    <dubbo:application name="demo-consumer"/>
    <!-- 使用zookeeper注册中心 -->
    <dubbo:registry protocol="zookeeper" address="localhost:2181" client="zkclient" group="dubbo_test"/>

    <!-- 生成远程服务代理,可以和本地bean一样使用demoService -->
    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>

</beans>
2.Consumer
public class Consumer {
    public static void main(String[] args) {
        System.setProperty("java.net.preferIPv4Stack", "true");
        ClassPathXmlApplicationContext context =
                new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
        context.start();
        DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理

        String hello = demoService.sayHello("world"); // 调用远程方法
        System.out.println(hello); // 显示调用结果

    }
}

构建消费者端时重点看:

DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理

二、调用简图

首先 ReferenceConfig 类的 init 方法调用 Protocolrefer 方法生成 Invoker 实例(如上图中的红色部分),这是服务消费的关键。接下来把 Invoker 转换为客户端需要的接口(如:HelloWorld)。

三、总体代码调用链

1.Consumer初始化
ReferenceConfig.init()
-->createProxy(Map<String, String> map)
  //一、获取Invoker
  -->RegistryProtocol.refer(Class<T> type, URL url)
    //1 连接ZK注册中心,创建ZkClient实例
    -->Registry registry = registryFactory.getRegistry(url)
      -->AbstractRegistryFactory.getRegistry(URL url)
        -->ZookeeperRegistryFactory.createRegistry(URL url)
          -->new ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter)
            -->ZkclientZookeeperTransporter.connect(URL url)
              -->new ZkclientZookeeperClient(URL url)
                -->new ZkClientWrapper(url.getBackupAddress(), 30000)
        -->AbstractRegistryFactory.Map<String, Registry> REGISTRIES.put("zookeeper://127.0.0.1:2181/dubbo_test/com.alibaba.dubbo.registry.RegistryService", 上边的ZookeeperRegistry实例)
    -->doRefer(Cluster cluster, Registry registry, Class<T> type, URL url)
      -->new RegistryDirectory<T>(type, url)
      //2 向ZK注册中心注册消费者url
      -->registry.register(url)
        -->ZookeeperRegistry.doRegister(URL url)
          -->AbstractZookeeperClient.create(String path, boolean ephemeral)
    //3 订阅providers、configurators、routers数据
      -->RegistryDirectory.subscribe(URL url)
        -->ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
          //3.1 会获取当前节点下已经存在的子节点(第一次服务发现发生在这里),添加子节点变化监听器
          -->List<String> children = zkClient.addChildListener(path, zkListener)
          -->AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
            -->saveProperties(url)
            -->RegistryDirectory.notify(List<URL> urls)
              //以下仅仅针对的是providers
              -->refreshInvoker(List<URL> invokerUrls)
                -->toInvokers(List<URL> urls)
                  -->ProtocolListenerWrapper.refer(Class<T> type, URL url)
                    -->ProtocolFilterWrapper.refer(Class<T> type, URL url)
                      -->DubboProtocol.refer(Class<T> serviceType, URL url)
                        //3.1.1 创建ExchangeClient,对第一次服务发现providers路径下的相关url建立长连接
                        -->getClients(URL url)
                          -->getSharedClient(URL url)
                            -->ExchangeClient exchangeClient = initClient(url)
                              -->Exchangers.connect(url, requestHandler)
                                -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
                                  -->new DecodeHandler(new HeaderExchangeHandler(handler)))
                                    -->Transporters.connect(URL url, ChannelHandler... handlers)
                                      -->NettyTransporter.connect(URL url, ChannelHandler listener)
                                        -->new NettyClient(url, listener)
                                          -->new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(handler, url)))
                                          -->getChannelCodec(url) // 获取Codec2,这里是DubboCountCodec实例
                                          -->doOpen() // 初始化netty客户端
                                          -->doConnect() // 连接Netty服务端,建立长连接
                                  -->new HeaderExchangeClient(Client client, boolean needHeartbeat) // 上述client为NettyClient实例,needHeartbeat为true
                                    -->startHeatbeatTimer() // 启动心跳检测
                            -->new ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap)
                            -->Map<String, ReferenceCountExchangeClient> referenceClientMap.put("172.16.132.166:20881", 上边的ReferenceCountExchangeClient实例)
                        //3.2 创建DubboInvoker
                        -->new DubboInvoker(Class<T> serviceType, URL url, ExchangeClient[] clients, Set<Invoker<?>> invokers)
                        -->DubboProtocol.Set<Invoker<?>> invokers.add(上边的DubboInvoker实例)
                      -->ProtocolFilterWrapper.buildInvokerChain(final Invoker<T> invoker, String key, String group)
                    -->new ListenerInvokerWrapper<T>(Invoker<T> invoker, List<InvokerListener> listeners)
                  -->new InvokerDelegete(Invoker<T> invoker, URL url, URL providerUrl)
                  //3.3 将创建出来的Invoker缓存起来
                  -->newUrlInvokerMap.put("dubbo://172.16.132.166:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=16431&register.ip=172.16.132.166&remote.timestamp=1551510809762&side=consumer&timestamp=1551512073471", 上边的InvokerDelegate实例)
                -->toMethodInvokers(newUrlInvokerMap)
                -->Map<String, List<Invoker<T>>> newMethodInvokerMap:{sayHello=[InvokerDelegete实例], *=[InvokerDelegete实例]}
      //4 将directory封装成一个Cluster Invoker(MockClusterInvoker)
      -->cluster.join(directory)
        -->Cluster$Adaptive.join(directory)
          -->ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("failover") // 返回MockClusterWrapper,包装了FailoverCluster
            -->MockClusterWrapper.join(Directory<T> directory)
              -->FailoverCluster.join(Directory<T> directory)
                -->new FailoverClusterInvoker<T>(directory)
              -->new MockClusterInvoker(Directory<T> directory, Invoker<T> invoker) // invoker--上边的FailoverClusterInvoker实例
  //二、获取服务代理
  -->JavassistProxyFactory.getProxy(Invoker<T> invoker, Class<?>[] interfaces) // invoker--上边的MockClusterInvoker实例,   interfaces:[interface com.alibaba.dubbo.demo.DemoService, interface com.alibaba.dubbo.rpc.service.EchoService]
    -->Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker))
      -->Proxy.getProxy(ClassLoader cl, Class<?>... ics) // 使用javassist获取一个动态类
      -->new InvokerInvocationHandler(invoker) //invoker--上边的MockClusterInvoker实例
2.Consumer调用方法

极简版流程图:

com.alibaba.dubbo.common.bytecode.proxy0.sayHello(String name) // 服务代理类
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
  // RpcInvocation[methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={}]
  -->MockClusterInvoker.invoke(Invocation invocation)
    -->FailoverClusterInvoker.invoke(final Invocation invocation)
      -->RegistryDirectory.list(Invocation invocation) // 根据RpcInvocation中的methodName获取Invoker
        -->router过滤
        -->LoadBalancer选取一个Invoker
      -->执行Filter链
        // RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
        -->DubboInvoker.invoke(Invocation inv)

在服务提供者端会根据RpcInvocation中的attachments中的path、group、version以及从channel中获取的本地地址的port拼接一个serviceKey:group/path:version:port,例如:com.alibaba.dubbo.demo.DemoService:20881,之后根据这个serviceKey从服务提供者端获取DubboExporter

注意:

1、每一个服务代理实例都会有自己的一个MockClusterInvoker实例,也就有自己的一个RegistryDirectory实例,所以假设消费者A引用了提供者服务B和C(不同的服务),B和C中都有sayHello(String name)方法时,则消费者A端,B服务代理实例中的RegistryDirectory存储sayHello=[对应B的Invoker对象],C服务代理实例中的RegistryDirectory存储sayHello=[对应C的Invoker对象]

2、假设消费者A调用了提供者服务B,B中有方法String sayHello(String name)/String sayHello(String name, Integer age)时,在消费者A端,B服务代理实例中的RegistryDirectory只存储一份的sayHello=[对应B的Invoker对象],注意这里一个Invoker对象其实对应一个Provider实例(服务B),B服务端根据serviceKey:group/path:version:port获取出DubboExporterDubboExporter获取到AbstractProxyInvokerAbstractProxyInvoker中的Wrapper类中就有String sayHello(String name)/String sayHello(String name, Integer age)两个方法,根据Request中的方法名、参数类型和参数值就可以找出具体执行哪一个方法。