Dubbo 客户端发起请求过程分析

客户端请求

Posted by Jay on March 11, 2019

Dubbo 客户端发起请求过程分析

客户端请求代码如下:

DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理
String hello = demoService.sayHello("world"); // 执行远程方法

Dubbo 服务引用之构建客户端源码解析这篇文章中已经分析了最终得到的demoService是一个com.alibaba.dubbo.common.bytecode.proxy0代理对象,下面分析第二行代码。

一、客户端请求总体流程

//代理发出请求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
  -->new RpcInvocation(method, args)
  -->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方
    //ClusterInvoker将多个Invoker伪装成一个集群版的Invoker
    -->AbstractClusterInvoker.invoke(final Invocation invocation)
      //获取Invokers
      -->list(Invocation invocation)
        -->AbstractDirectory.list(Invocation invocation)
          -->RegistryDirectory.doList(Invocation invocation)//从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello的List<Invoker<T>>
          -->MockInvokersSelector.getNormalInvokers(final List<Invoker<T>> invokers)//对上述的List<Invoker<T>>再进行一次过滤(这里比如说过滤出所有协议为mock的Invoker,如果一个也没有就全部返回),这就是router的作用
      //获取负载均衡器
      -->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默认为random
      -->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//异步操作添加invocationID
      -->FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
        //使用负载均衡器选择一个Invoker出来:RegistryDirectory$InvokerDelegete实例
        -->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
          -->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
            -->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
              -->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
        //执行Filter链
        -->ListenerInvokerWrapper.invoke
          -->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//设置一些RpcContext属性,并且设置invocation中的invoker属性
            -->FutureFilter.invoke(Invocation invocation)
              -->MonitorFilter.invoke(Invocation invocation)//monitor在这里收集数据
                -->AbstractInvoker.invoke(Invocation inv)//重新设置了invocation中的invoker属性和attachment属性
                  -->DubboInvoker.doInvoke(final Invocation invocation)
                    //获取ExchangeClient进行消息的发送
                    -->ReferenceCountExchangeClient.request(Object request, int timeout)request--RpcInvocation
                      -->HeaderExchangeClient.request(Object request, int timeout)
                        -->HeaderExchangeChannel.request(Object request, int timeout)
                          -->AbstractPeer.send(Object message) message--Request
                          	-->AbstractClient.send(Object message, boolean sent) //NettyClient的父类
                              -->getChannel() //NettyChannel实例,其内部channel实例=NioClientSocketChannel实例
                              -->NettyChannel.send(Object message, boolean sent)
                                -->NioClientSocketChannel.write(Object message)//已经是netty的东西,这里的message=Request实例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
                                  -->NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e) //写出请求
									-->NettyClient.sent(NettyChannel, e.getMessage()--Request)
										-->AbstractPeer.sent(NettyChannel, Request)
											-->MultiMessageHandler.sent(NettyChannel, Request)
												-->HeartBeatHandler.sent(NettyChannel, Request)
													-->AllChannelHandler.sent(NettyChannel, Request)
														-->DecodeHandler.sent(NettyChannel, Request)
															-->HeaderExchangeHandler.sent(NettyChannel, Request)
																-->DubboProtocol.requestHandler.sent(HeaderExchangeChannel, Request)
																-->DefaultFuture.sent(channel, request) //标记发送状态

总体流程如下:

  • 将请求参数(方法名,方法参数类型,方法参数值,服务名,附加参数)封装成一个RpcInvocation
    • 附加参数中的path:即接口名,将会用于服务端接收请求信息后从exportMap中选取DubboExporter实例;
    • 方法名,方法参数类型,方法参数值:将用于JavassistProxyFactory$AbstractProxyInvoker执行对应的方法。
  • 使用RegistryDirectoryMap<String, List<Invoker<T>>> methodInvokerMap中获取keysayHello(指定方法名)的List<Invoker<T>>
  • 使用Router对上述的List<Invoker<T>>再进行一次过滤,得到新的invoker列表;
  • 使用LoadBalance策略从新的invoker列表中选择一个Invoker,实际上是InvokerDelegete实例;
  • 使用InvokerDelegete实例执行真正的DubboInvokerFilter链,然后执行到真正的DubboInvoker
  • DubboInvoker使用NettyClient向服务端发出了请求。

二、源码分析

首先看proxy0.sayHello:

public String sayHello(String string) {
    Object[] arrobject = new Object[]{string};
    Object object = this.handler.invoke(this, DemoService.class.getMethod("sayHello"), arrobject);
    return (String)object;
}

这里的handlerInvokerInvocationHandler实例。

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker; // MockClusterInvoker实例

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    // 返回调用结果
    // @param proxy 服务代理类(com.alibaba.dubbo.common.bytecode.proxy0实例)
    // @param method 调用的方法
    // @param args 方法参数
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        // Object方法,直接调用
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // RPC调用出发点,构建Invocation
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

首先将请求参数封装成一个RpcInvocation实例,如下:

-->String methodName=sayHello
-->Class<?>[] parameterTypes=[class java.lang.String]
-->Object[] arguments=[world]
-->Map<String, String> attachments={}

之后使用MockClusterInvoker.invoke(Invocation invocation)进行远程调用:

private final Directory<T> directory; // RegistryDirectory实例
private final Invoker<T> invoker; // FailoverClusterInvoker实例

// invocation--RpcInvocation实例
// 这里实际上会根据配置的mock参数来做服务降级:
// 1.如果没有配置mock参数或者mock=false,则进行远程调用;
// 2.如果配置了mock=force:return null,则直接返回null,不进行远程调用;
// 3.如果配置了mock=fail:return null,先进行远程调用,失败了再进行mock调用。
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    // sayHello.mock->mock->default.mock   
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); // false
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {
        //no mock 不是mock,直接调用
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
        }
        //force:direct mock 直接mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock 调用失败后mock
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            } else {
                if (logger.isWarnEnabled()) {
                    logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                }
                result = doMockInvoke(invocation, e);
            }
        }
    }
    return result;
}

注意:这里可以做服务降级,参考服务降级

之后调用FailoverClusterInvoker.invoke方法,该方法在其父类AbstractClusterInvoker中:

protected final Directory<T> directory; // RegistryDirectory

// invocation RpcInvocation实例,执行远程调用
public Result invoke(final Invocation invocation) throws RpcException {
    // 检查是否已经销毁
    checkWhetherDestroyed();
    // 负载均衡
    LoadBalance loadbalance;
    // 获取invocation(调用方法)对应的可用的invoker(经过路由router)
    List<Invoker<T>> invokers = list(invocation);
    if (invokers != null && invokers.size() > 0) {
        // invokers.get(0).getUrl()----合并消费者参数之后的提供者url RandomLoadBalance
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
    } else {
        loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
    }
    // 异步操作默认添加invocation id
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    // 真正的调用
    return doInvoke(invocation, invokers, loadbalance);
}

首先是获取一个List<Invoker<T>>,之后获取LoadBalance策略,最后调用doInvoke进行调用。

首先来看RegistryDirectory.list(Invocation invocation),该方法在RegistryDirectory的父类AbstractDirectory中:

// 获取invocation(调用方法)对应的可用的invoker(经过路由router)
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    List<Invoker<T>> invokers = doList(invocation); // 列举invoker
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && localRouters.size() > 0) {
        for (Router router : localRouters) {
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    // router: MockInvokersSelector实例
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

首先执行doList(invocation)方法获取出List<Invoker<T>>,之后使用router循环过滤,最后返回过滤后的List<Invoker<T>>

RegistryDirectory.doList(invocation):

// 根据方法,列出符合的invoker
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. 没有服务提供者 2. 服务提供者被禁用 
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
            "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                + " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
    }
    List<Invoker<T>> invokers = null;
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        String methodName = RpcUtils.getMethodName(invocation); // 方法名
        Object[] args = RpcUtils.getArguments(invocation); // 方法参数
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) { // 第一个参数是字符串或枚举
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由 sayHello.world
        }
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(methodName);
        }
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

其中Map<String, List<Invoker<T>>> methodInvokerMap在Dubbo 服务引用之构建客户端源码解析中已经初始化好了:

Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[RegistryDirectory$InvokerDelegete实例], *=[RegistryDirectory$InvokerDelegete实例]}

这里根据方法名sayHello取出一个RegistryDirectory$InvokerDelegete实例。最后通过Router进行过滤,这里只有一个Router,就是MockInvokersSelector

// invokers--RegistryDirectory列举出来的invokers
// url--消费者url
// invocation--RpcInvocation
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
                                  URL url, final Invocation invocation) throws RpcException {
    if (invocation.getAttachments() == null) {
        return getNormalInvokers(invokers); 
    } else {
        String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
        if (value == null)
            return getNormalInvokers(invokers); // 去除mock协议的invoker,返回
        else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
            return getMockedInvokers(invokers); // 获取mock协议的provider
        }
    }
    return invokers;
}

private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
    if (!hasMockProviders(invokers)) {
        return invokers; // 没有mock协议的provider,直接返回
    } else {
        List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
        for (Invoker<T> invoker : invokers) {
            if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
                sInvokers.add(invoker);
            }
        }
        return sInvokers; // 去除mock协议的invoker,返回
    }
}

这里直接返回了。到此就已经选出可以被调用的RegistryDirectory$InvokerDelegete实例列表了。记下来先获取负载均衡策略的实现,默认是RandomLoadBalance。最后执行FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    // 调用次数(重试次数+1),默认是2+1次
    int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception. 上次调用的异常
    // 调用过的invoker
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. 已经调用过的invokers
    Set<String> providers = new HashSet<String>(len); // 调用失败的provider地址ip:port
    for (int i = 0; i < len; i++) {
        //重试时,进行重新选择,避免重试时invoker列表已发生变化.
        //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation); // 重新从RegistryDirectory获取invokers
            checkInvokers(copyInvokers, invocation); // 重新检查一下
        }
        // 负载均衡选择invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker); // 添加到调用过的invoker列表
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) { // 上一次调用出现异常
                logger.warn("Although retry the method " + invocation.getMethodName()
                        + " in the service " + getInterface().getName()
                        + " was successful by the provider " + invoker.getUrl().getAddress()
                        + ", but there have been failed providers " + providers
                        + " (" + providers.size() + "/" + copyInvokers.size()
                        + ") from the registry " + directory.getUrl().getAddress()
                        + " on the consumer " + NetUtils.getLocalHost()
                        + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                        + le.getMessage(), le);
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception. 业务异常,直接抛出
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress()); // 调用失败的provider地址ip:port
        }
    }
    throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
            + invocation.getMethodName() + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}

首先利用负载均衡策略获取一个RegistryDirectory$InvokerDelegete实例,然后使用选出的RegistryDirectory​$InvokerDelegete.invoke进行请求发送(远程调用)。

// 使用loadbalance选择invoker.
// a)先lb选择,如果在selected列表中 或者 不可用且做检验时,进入下一步(重选),否则直接返回
// b)重选验证规则:selected > available .保证重选出的结果尽量不在select中,并且是可用的
// @param selected       已选过的invoker.注意:输入保证不重复
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
        throws RpcException {
    if (invokers == null || invokers.size() == 0)
        return null;
    String methodName = invocation == null ? "" : invocation.getMethodName(); // 获取方法名
    // 集群时是否启用sticky策略,默认false
    boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
    
    ...

    // 真正的选择过程
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}
// 真正的选择过程
// selected--已选择过的invoker,invokers--目前全部的invoker
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
    if (invokers == null || invokers.size() == 0)
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    // 如果只有两个invoker,退化成轮循 
    if (invokers.size() == 2 && selected != null && selected.size() > 0) {
        return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
    }
    // 默认随机负载均衡
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //如果 selected 中包含(优先判断) 或者 不可用&&availablecheck=true 则重选.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            // 重选invoker
            Invoker<T> reselectInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (reselectInvoker != null) {
                invoker = reselectInvoker;
            } else {
                //看下第一次选的位置,如果不是最后,选+1位置.
                int index = invokers.indexOf(invoker);
                try {
                    //最后在避免碰撞
                    invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                } catch (Exception e) {
                    logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                }
            }
        } catch (Throwable t) {
            logger.error("cluster reselect fail reason is : " + t.getMessage()
                    + ". if can not solve, you can set cluster.availablecheck=false in url", t);
        }
    }
    return invoker;
}

RandomLoadBalance.select

// AbstractLoadBalance方法
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (invokers == null || invokers.size() == 0)
        return null;
    if (invokers.size() == 1)
        return invokers.get(0);
    return doSelect(invokers, url, invocation);
}
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
    // invoker权重
    int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
    if (weight > 0) {
        long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
        if (timestamp > 0L) {
            int uptime = (int) (System.currentTimeMillis() - timestamp); // 已运行时间
            int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
            if (uptime > 0 && uptime < warmup) {
                weight = calculateWarmupWeight(uptime, warmup, weight);
            }
        }
    }
    return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
    return ww < 1 ? 1 : (ww > weight ? weight : ww);
}

// RandomLoadBalance方法
private final Random random = new Random();

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    int length = invokers.size(); // 总个数
    int totalWeight = 0; // 总权重
    boolean sameWeight = true; // 权重是否都一样
    for (int i = 0; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation); // 权重
        totalWeight += weight; // 累计总权重
        if (sameWeight && i > 0
            && weight != getWeight(invokers.get(i - 1), invocation)) {
            sameWeight = false; // 计算所有权重是否一样
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        // 如果权重不相同且权重大于0则按总权重数随机
        int offset = random.nextInt(totalWeight);
        // 并确定随机值落在哪个片断上
        for (int i = 0; i < length; i++) {
            offset -= getWeight(invokers.get(i), invocation);
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // 如果权重相同或权重为0则均等随机
    return invokers.get(random.nextInt(length));
}

AbstractClusterInvoker.reselect:

private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
            throws RpcException {

    //预先分配一个,这个列表是一定会用到的.
    List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ?
                                                                  (invokers.size() - 1) : invokers.size());

    //先从非select中选
    if (availablecheck) { //选isAvailable 的非select
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                if (selected == null || !selected.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
        }
        if (reselectInvokers.size() > 0) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    } else { //选全部非select
        for (Invoker<T> invoker : invokers) {
            if (selected == null || !selected.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
        if (reselectInvokers.size() > 0) {
            return loadbalance.select(reselectInvokers, getUrl(), invocation);
        }
    }
    //最后从select中选可用的isAvailable.
    if (selected != null) {
        for (Invoker<T> invoker : selected) {
            if ((invoker.isAvailable()) //优先选available
                && !reselectInvokers.contains(invoker)) {
                reselectInvokers.add(invoker);
            }
        }
    }
    if (reselectInvokers.size() > 0) {
        return loadbalance.select(reselectInvokers, getUrl(), invocation);
    }
    return null;
}

最后来看RegistryDirectory$InvokerDelegete.invoke,该方法实际在其父类InvokerWrapper中:

private final Invoker<T> invoker; // ListenerInvokerWrapper实例

public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

ListenerInvokerWrapper.invoke:

private final Invoker<T> invoker; // Filter包裹后的DubboInvoker实例

public Result invoke(Invocation invocation) throws RpcException {
    return invoker.invoke(invocation);
}

之后就会执行一系列的Filter,这些Filter后续会讲,现在直接执行到DubboInvoker.invoke,实际上该方法在其父类AbstractInvoker中,AbstractInvoker又调用了DubboInvoker.doInvoke

// AbstractInvoker.invoke()
public Result invoke(Invocation inv) throws RpcException {
    if (destroyed.get()) {
        throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
                + " use dubbo version " + Version.getVersion()
                + " is DESTROYED, can not be invoked any more!"
        );
    }
    RpcInvocation invocation = (RpcInvocation) inv;
    // 重新设置Invoker(DubboInvoker)
    invocation.setInvoker(this);
    // 设置attachments
    if (attachment != null && attachment.size() > 0) {
        invocation.addAttachmentsIfAbsent(attachment);
    }
    Map<String, String> context = RpcContext.getContext().getAttachments();
    if (context != null) {
        invocation.addAttachmentsIfAbsent(context);
    }
    // 是否异步执行
    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
    }
    // 异步操作添加 invocation id
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);


    try {
        return doInvoke(invocation);
    } catch (InvocationTargetException e) { // biz exception 
        Throwable te = e.getTargetException();
        if (te == null) {
            return new RpcResult(e);
        } else {
            if (te instanceof RpcException) {
                ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
            }
            return new RpcResult(te);
        }
    } catch (RpcException e) {
        if (e.isBiz()) {
            return new RpcResult(e); // 业务异常
        } else {
            throw e; // 非业务异常抛出
        }
    } catch (Throwable e) {
        return new RpcResult(e);
    }
}
// DubboInvoker属性和方法
private final ExchangeClient[] clients; // [ReferenceCountExchangeClient实例]

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation); // 方法名
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
    inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version

    // 确定客户端
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0]; // 默认单一长连接
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
        boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
        if (isOneway) { // 无返回值
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) { // 异步有返回值
            ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else { // 同步有返回值
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待
        }
    } catch (TimeoutException e) { // 超时异常
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) { // 网络异常
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

其中ExchangeClient[] clientsDubbo 服务引用之构建客户端源码解析中已经被初始化好了:

ExchangeClient[] clients = [ReferenceCountExchangeClient实例] // 如果设置了多条连接,此处有多个client

ReferenceCountExchangeClient.request:

private ExchangeClient client; // HeaderExchangeClient实例

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    return client.request(request, timeout);
}

HeaderExchangeClient.request:

private final ExchangeChannel channel; // HeaderExchangeChannel实例

public ResponseFuture request(Object request, int timeout) throws RemotingException {
    return channel.request(request, timeout);
}

HeaderExchangeChannel.request:

private final Channel channel; // NettyClient实例

// 发送请求
// @param request RpcInvocation对象
// @param timeout 超时时间
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0"); // Dubbo 协议版本
    req.setTwoWay(true); // 是否双向,有返回值
    req.setData(request); // 设置Data为Invocation
    // <NettyClient, Request, timeout>
    // 由于netty写出数据是个异步过程,因此这边借助DefaultFuture,不等请求发送完毕,直接给消费者返回
    // DefaultFuture,然后消费者调用DefaultFuture.get()阻塞等待,直到服务端返回响应,通过
    // DefaultFuture唤醒消费者线程,实现了"异步转同步"。
    DefaultFuture future = new DefaultFuture(channel, req, timeout);
    try {
        channel.send(req); // channel--NettyClient实例,发送请求
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

上边的channelNettyClient实例,这里的send实际上是调用其父类AbstractClient的父类AbstractPeerAbstractPeer调用AbstractClient.send

// @param message Request对象
// @param sent 是否等待请求发送完毕
public void send(Object message, boolean sent) throws RemotingException {
    if (sendReconnect && !isConnected()) {
        connect();
    }
    Channel channel = getChannel(); // NettyChannel对象
    //TODO getChannel返回的状态是否包含null需要改进
    if (channel == null || !channel.isConnected()) {
        throw new RemotingException(this, "message can not send, because channel is closed. url:" + getUrl());
    }
    channel.send(message, sent);
}

NettyChannel.send:

private final org.jboss.netty.channel.Channel channel; // NioClientSocketChannel实例

// @param message Request对象
// @param sent 是否等待请求发送完毕
public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent); // 检查Channel是否已经关闭

    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.write(message); // 异步请求
        if (sent) { // sent=true,等待请求发送完成
            // 超时
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout); // 等待写操作完成
        }
        Throwable cause = future.getCause(); // 若写操作失败,得到失败异常
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
                + ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
                e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

这里就执行到了netty内部,通过netty自己的NioClientSocketChannel将消息发送给服务端。由于netty的每个channel都会有pipelineNettyClient pipelineChannelHandler有解码器、编码器及Nettyhandler。因此这里首先会经过Nettyhandler的处理(调用流程如文章前面所示):

// 写出请求
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    // 将写出数据的事件往下传播,即将请求编码后发送到服务端(这里编码细节后续再讲)
    super.writeRequested(ctx, e); 
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        // handler--NettyClient
        handler.sent(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

请求异步发送到服务端后,接下来执行NettyClient.sent(channel, e.getMessage()),实际上调用的是其父类AbstractPeersent(Channel ch, Object msg)方法:

public void sent(Channel ch, Object msg) throws RemotingException {
    if (closed) {
        return;
    }
    // handler--MultiMessageHandler
    handler.sent(ch, msg);
}

继续调用MultiMessageHandler.sent(Channel channel, Object message)方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--HeartBeatHandler
    handler.sent(channel, message);
}

接着调用HeartBeatHandler.sent(Channel channel, Object message),实际上调用的也是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--AllChannelHandler
    handler.sent(channel, message);
}

接着调用AllChannelHandler.sent(Channel channel, Object message)方法,实际上调用的是父类WrappedChannelHandler.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--DecodeHandler实例
    handler.sent(channel, message);
}

继续调用DecodeHandler.sent(Channel channel, Object message)方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--HeaderExchangeHandler
    handler.sent(channel, message);
}

继续调用HeaderExchangeHandler.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    Throwable exception = null;
    try {
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); // 设置写时间戳
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // NettyChannel绑定一个HeaderExchangeChannel
        try {
            handler.sent(exchangeChannel, message); // do nothing
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    } catch (Throwable t) {
        exception = t;
    }
    if (message instanceof Request) {
        Request request = (Request) message;
        DefaultFuture.sent(channel, request); // 标记发送状态
    }
    if (exception != null) {
        if (exception instanceof RuntimeException) {
            throw (RuntimeException) exception;
        } else if (exception instanceof RemotingException) {
            throw (RemotingException) exception;
        } else {
            throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
                    exception.getMessage(), exception);
        }
    }
}

HeaderExchangeHandler.sent(Channel channel, Object message)方法中,调用了DubboProtocol匿名内部类requestHandlersent(Channel channel, Object message),这里没有什么逻辑。接着,由于messageRequest对象,因此会将当前请求对应的DefaultFuturetsent变量赋值为当前时间,表示客户端已经将请求发送给服务端。

到此,客户端请求流程介绍完毕。此时,再来看DubboInvoker.doInvoker()方法:

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation); // 方法名
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
    inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version

    // 确定客户端
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
        boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
        if (isOneway) { // 无返回值
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) { // 异步有返回值
            ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else { // 同步有返回值
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待 ###
        }
    } catch (TimeoutException e) { // 超时异常
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) { // 网络异常
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

在“###”处,ccurrentClient.request(inv, timeout)返回的是DefaultFuture,即ResponseFuture实现。此外,DefaultFuture.get()的执行后续文章分析。