Dubbo 客户端发起请求过程分析
客户端请求代码如下:
DemoService demoService = (DemoService) context.getBean("demoService"); // 获取远程服务代理
String hello = demoService.sayHello("world"); // 执行远程方法
Dubbo 服务引用之构建客户端源码解析这篇文章中已经分析了最终得到的demoService
是一个com.alibaba.dubbo.common.bytecode.proxy0
代理对象,下面分析第二行代码。
一、客户端请求总体流程
//代理发出请求
proxy0.sayHello(String paramString)
-->InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)
-->new RpcInvocation(method, args)
-->MockClusterInvoker.invoke(Invocation invocation)//服务降级的地方
//ClusterInvoker将多个Invoker伪装成一个集群版的Invoker
-->AbstractClusterInvoker.invoke(final Invocation invocation)
//获取Invokers
-->list(Invocation invocation)
-->AbstractDirectory.list(Invocation invocation)
-->RegistryDirectory.doList(Invocation invocation)//从Map<String, List<Invoker<T>>> methodInvokerMap中获取key为sayHello的List<Invoker<T>>
-->MockInvokersSelector.getNormalInvokers(final List<Invoker<T>> invokers)//对上述的List<Invoker<T>>再进行一次过滤(这里比如说过滤出所有协议为mock的Invoker,如果一个也没有就全部返回),这就是router的作用
//获取负载均衡器
-->loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE))//默认为random
-->RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation)//异步操作添加invocationID
-->FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
//使用负载均衡器选择一个Invoker出来:RegistryDirectory$InvokerDelegete实例
-->AbstractClusterInvoker.select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
-->AbstractLoadBalance.select(List<Invoker<T>> invokers, URL url, Invocation invocation)
-->RandomLoadBalance.doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation)
//执行Filter链
-->ListenerInvokerWrapper.invoke
-->ConsumerContextFilter.invoke(Invoker<?> invoker, Invocation invocation)//设置一些RpcContext属性,并且设置invocation中的invoker属性
-->FutureFilter.invoke(Invocation invocation)
-->MonitorFilter.invoke(Invocation invocation)//monitor在这里收集数据
-->AbstractInvoker.invoke(Invocation inv)//重新设置了invocation中的invoker属性和attachment属性
-->DubboInvoker.doInvoke(final Invocation invocation)
//获取ExchangeClient进行消息的发送
-->ReferenceCountExchangeClient.request(Object request, int timeout),request--RpcInvocation
-->HeaderExchangeClient.request(Object request, int timeout)
-->HeaderExchangeChannel.request(Object request, int timeout)
-->AbstractPeer.send(Object message) message--Request
-->AbstractClient.send(Object message, boolean sent) //NettyClient的父类
-->getChannel() //NettyChannel实例,其内部channel实例=NioClientSocketChannel实例
-->NettyChannel.send(Object message, boolean sent)
-->NioClientSocketChannel.write(Object message)//已经是netty的东西,这里的message=Request实例:最重要的是RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
-->NettyHandler.writeRequested(ChannelHandlerContext ctx, MessageEvent e) //写出请求
-->NettyClient.sent(NettyChannel, e.getMessage()--Request)
-->AbstractPeer.sent(NettyChannel, Request)
-->MultiMessageHandler.sent(NettyChannel, Request)
-->HeartBeatHandler.sent(NettyChannel, Request)
-->AllChannelHandler.sent(NettyChannel, Request)
-->DecodeHandler.sent(NettyChannel, Request)
-->HeaderExchangeHandler.sent(NettyChannel, Request)
-->DubboProtocol.requestHandler.sent(HeaderExchangeChannel, Request)
-->DefaultFuture.sent(channel, request) //标记发送状态
总体流程如下:
- 将请求参数(方法名,方法参数类型,方法参数值,服务名,附加参数)封装成一个
RpcInvocation
;- 附加参数中的
path
:即接口名,将会用于服务端接收请求信息后从exportMap
中选取DubboExporter
实例; - 方法名,方法参数类型,方法参数值:将用于
JavassistProxyFactory$AbstractProxyInvoker
执行对应的方法。
- 附加参数中的
- 使用
RegistryDirectory
从Map<String, List<Invoker<T>>> methodInvokerMap
中获取key
为sayHello
(指定方法名)的List<Invoker<T>>
; - 使用
Router
对上述的List<Invoker<T>>
再进行一次过滤,得到新的invoker
列表; - 使用
LoadBalance
策略从新的invoker
列表中选择一个Invoker
,实际上是InvokerDelegete
实例; - 使用
InvokerDelegete
实例执行真正的DubboInvoker
的Filter
链,然后执行到真正的DubboInvoker
; DubboInvoker
使用NettyClient
向服务端发出了请求。
二、源码分析
首先看proxy0.sayHello
:
public String sayHello(String string) {
Object[] arrobject = new Object[]{string};
Object object = this.handler.invoke(this, DemoService.class.getMethod("sayHello"), arrobject);
return (String)object;
}
这里的handler
是InvokerInvocationHandler
实例。
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker<?> invoker; // MockClusterInvoker实例
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
// 返回调用结果
// @param proxy 服务代理类(com.alibaba.dubbo.common.bytecode.proxy0实例)
// @param method 调用的方法
// @param args 方法参数
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// Object方法,直接调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// RPC调用出发点,构建Invocation
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
首先将请求参数封装成一个RpcInvocation
实例,如下:
-->String methodName=sayHello
-->Class<?>[] parameterTypes=[class java.lang.String]
-->Object[] arguments=[world]
-->Map<String, String> attachments={}
之后使用MockClusterInvoker.invoke(Invocation invocation)
进行远程调用:
private final Directory<T> directory; // RegistryDirectory实例
private final Invoker<T> invoker; // FailoverClusterInvoker实例
// invocation--RpcInvocation实例
// 这里实际上会根据配置的mock参数来做服务降级:
// 1.如果没有配置mock参数或者mock=false,则进行远程调用;
// 2.如果配置了mock=force:return null,则直接返回null,不进行远程调用;
// 3.如果配置了mock=fail:return null,先进行远程调用,失败了再进行mock调用。
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
// sayHello.mock->mock->default.mock
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); // false
if (value.length() == 0 || value.equalsIgnoreCase("false")) {
//no mock 不是mock,直接调用
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock 直接mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock 调用失败后mock
try {
result = this.invoker.invoke(invocation);
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}
return result;
}
注意:这里可以做服务降级,参考服务降级。
之后调用FailoverClusterInvoker.invoke
方法,该方法在其父类AbstractClusterInvoker
中:
protected final Directory<T> directory; // RegistryDirectory
// invocation RpcInvocation实例,执行远程调用
public Result invoke(final Invocation invocation) throws RpcException {
// 检查是否已经销毁
checkWhetherDestroyed();
// 负载均衡
LoadBalance loadbalance;
// 获取invocation(调用方法)对应的可用的invoker(经过路由router)
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
// invokers.get(0).getUrl()----合并消费者参数之后的提供者url RandomLoadBalance
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 异步操作默认添加invocation id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 真正的调用
return doInvoke(invocation, invokers, loadbalance);
}
首先是获取一个List<Invoker<T>>
,之后获取LoadBalance
策略,最后调用doInvoke
进行调用。
首先来看RegistryDirectory.list(Invocation invocation)
,该方法在RegistryDirectory
的父类AbstractDirectory
中:
// 获取invocation(调用方法)对应的可用的invoker(经过路由router)
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation); // 列举invoker
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && localRouters.size() > 0) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
// router: MockInvokersSelector实例
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
首先执行doList(invocation)
方法获取出List<Invoker<T>>
,之后使用router
循环过滤,最后返回过滤后的List<Invoker<T>>
。
RegistryDirectory.doList(invocation)
:
// 根据方法,列出符合的invoker
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. 没有服务提供者 2. 服务提供者被禁用
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation); // 方法名
Object[] args = RpcUtils.getArguments(invocation); // 方法参数
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) { // 第一个参数是字符串或枚举
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由 sayHello.world
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
其中Map<String, List<Invoker<T>>> methodInvokerMa
p在Dubbo 服务引用之构建客户端源码解析中已经初始化好了:
Map<String, List<Invoker<T>>> methodInvokerMap={
sayHello=[RegistryDirectory$InvokerDelegete实例], *=[RegistryDirectory$InvokerDelegete实例]}
这里根据方法名sayHello
取出一个RegistryDirectory$InvokerDelegete
实例。最后通过Router
进行过滤,这里只有一个Router
,就是MockInvokersSelector
。
// invokers--RegistryDirectory列举出来的invokers
// url--消费者url
// invocation--RpcInvocation
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,
URL url, final Invocation invocation) throws RpcException {
if (invocation.getAttachments() == null) {
return getNormalInvokers(invokers);
} else {
String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
if (value == null)
return getNormalInvokers(invokers); // 去除mock协议的invoker,返回
else if (Boolean.TRUE.toString().equalsIgnoreCase(value)) {
return getMockedInvokers(invokers); // 获取mock协议的provider
}
}
return invokers;
}
private <T> List<Invoker<T>> getNormalInvokers(final List<Invoker<T>> invokers) {
if (!hasMockProviders(invokers)) {
return invokers; // 没有mock协议的provider,直接返回
} else {
List<Invoker<T>> sInvokers = new ArrayList<Invoker<T>>(invokers.size());
for (Invoker<T> invoker : invokers) {
if (!invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)) {
sInvokers.add(invoker);
}
}
return sInvokers; // 去除mock协议的invoker,返回
}
}
这里直接返回了。到此就已经选出可以被调用的RegistryDirectory$InvokerDelegete
实例列表了。记下来先获取负载均衡策略的实现,默认是RandomLoadBalance
。最后执行FailoverClusterInvoker.doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance)
:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
// 调用次数(重试次数+1),默认是2+1次
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception. 上次调用的异常
// 调用过的invoker
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. 已经调用过的invokers
Set<String> providers = new HashSet<String>(len); // 调用失败的provider地址ip:port
for (int i = 0; i < len; i++) {
//重试时,进行重新选择,避免重试时invoker列表已发生变化.
//注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation); // 重新从RegistryDirectory获取invokers
checkInvokers(copyInvokers, invocation); // 重新检查一下
}
// 负载均衡选择invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker); // 添加到调用过的invoker列表
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) { // 上一次调用出现异常
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception. 业务异常,直接抛出
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress()); // 调用失败的provider地址ip:port
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
首先利用负载均衡策略获取一个RegistryDirectory$InvokerDelegete
实例,然后使用选出的RegistryDirectory$InvokerDelegete.invoke
进行请求发送(远程调用)。
// 使用loadbalance选择invoker.
// a)先lb选择,如果在selected列表中 或者 不可用且做检验时,进入下一步(重选),否则直接返回
// b)重选验证规则:selected > available .保证重选出的结果尽量不在select中,并且是可用的
// @param selected 已选过的invoker.注意:输入保证不重复
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected)
throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName(); // 获取方法名
// 集群时是否启用sticky策略,默认false
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
...
// 真正的选择过程
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
// 真正的选择过程
// selected--已选择过的invoker,invokers--目前全部的invoker
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 如果只有两个invoker,退化成轮循
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
// 默认随机负载均衡
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果 selected 中包含(优先判断) 或者 不可用&&availablecheck=true 则重选.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
// 重选invoker
Invoker<T> reselectInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (reselectInvoker != null) {
invoker = reselectInvoker;
} else {
//看下第一次选的位置,如果不是最后,选+1位置.
int index = invokers.indexOf(invoker);
try {
//最后在避免碰撞
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("cluster reselect fail reason is : " + t.getMessage()
+ ". if can not solve, you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
RandomLoadBalance.select
:
// AbstractLoadBalance方法
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// invoker权重
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp); // 已运行时间
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
// RandomLoadBalance方法
private final Random random = new Random();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 总个数
int totalWeight = 0; // 总权重
boolean sameWeight = true; // 权重是否都一样
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation); // 权重
totalWeight += weight; // 累计总权重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 计算所有权重是否一样
}
}
if (totalWeight > 0 && !sameWeight) {
// 如果权重不相同且权重大于0则按总权重数随机
int offset = random.nextInt(totalWeight);
// 并确定随机值落在哪个片断上
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果权重相同或权重为0则均等随机
return invokers.get(random.nextInt(length));
}
AbstractClusterInvoker.reselect
:
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
throws RpcException {
//预先分配一个,这个列表是一定会用到的.
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ?
(invokers.size() - 1) : invokers.size());
//先从非select中选
if (availablecheck) { //选isAvailable 的非select
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (reselectInvokers.size() > 0) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
} else { //选全部非select
for (Invoker<T> invoker : invokers) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (reselectInvokers.size() > 0) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
//最后从select中选可用的isAvailable.
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) //优先选available
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (reselectInvokers.size() > 0) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
return null;
}
最后来看RegistryDirectory$InvokerDelegete.invoke
,该方法实际在其父类InvokerWrapper
中:
private final Invoker<T> invoker; // ListenerInvokerWrapper实例
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
ListenerInvokerWrapper.invoke
:
private final Invoker<T> invoker; // Filter包裹后的DubboInvoker实例
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
之后就会执行一系列的Filter
,这些Filter
后续会讲,现在直接执行到DubboInvoker.invoke
,实际上该方法在其父类AbstractInvoker
中,AbstractInvoker
又调用了DubboInvoker.doInvoke
:
// AbstractInvoker.invoke()
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
+ " use dubbo version " + Version.getVersion()
+ " is DESTROYED, can not be invoked any more!"
);
}
RpcInvocation invocation = (RpcInvocation) inv;
// 重新设置Invoker(DubboInvoker)
invocation.setInvoker(this);
// 设置attachments
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
// 是否异步执行
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
// 异步操作添加 invocation id
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e); // 业务异常
} else {
throw e; // 非业务异常抛出
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
// DubboInvoker属性和方法
private final ExchangeClient[] clients; // [ReferenceCountExchangeClient实例]
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation); // 方法名
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version
// 确定客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0]; // 默认单一长连接
} else {
currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
if (isOneway) { // 无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) { // 异步有返回值
ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { // 同步有返回值
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待
}
} catch (TimeoutException e) { // 超时异常
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) { // 网络异常
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
其中ExchangeClient[] clients
在Dubbo 服务引用之构建客户端源码解析中已经被初始化好了:
ExchangeClient[] clients = [ReferenceCountExchangeClient实例] // 如果设置了多条连接,此处有多个client
ReferenceCountExchangeClient.request
:
private ExchangeClient client; // HeaderExchangeClient实例
public ResponseFuture request(Object request, int timeout) throws RemotingException {
return client.request(request, timeout);
}
HeaderExchangeClient.request
:
private final ExchangeChannel channel; // HeaderExchangeChannel实例
public ResponseFuture request(Object request, int timeout) throws RemotingException {
return channel.request(request, timeout);
}
HeaderExchangeChannel.request
:
private final Channel channel; // NettyClient实例
// 发送请求
// @param request RpcInvocation对象
// @param timeout 超时时间
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0"); // Dubbo 协议版本
req.setTwoWay(true); // 是否双向,有返回值
req.setData(request); // 设置Data为Invocation
// <NettyClient, Request, timeout>
// 由于netty写出数据是个异步过程,因此这边借助DefaultFuture,不等请求发送完毕,直接给消费者返回
// DefaultFuture,然后消费者调用DefaultFuture.get()阻塞等待,直到服务端返回响应,通过
// DefaultFuture唤醒消费者线程,实现了"异步转同步"。
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req); // channel--NettyClient实例,发送请求
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
上边的channel
是NettyClient
实例,这里的send
实际上是调用其父类AbstractClient
的父类AbstractPeer
,AbstractPeer
调用AbstractClient.send
:
// @param message Request对象
// @param sent 是否等待请求发送完毕
public void send(Object message, boolean sent) throws RemotingException {
if (sendReconnect && !isConnected()) {
connect();
}
Channel channel = getChannel(); // NettyChannel对象
//TODO getChannel返回的状态是否包含null需要改进
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed. url:" + getUrl());
}
channel.send(message, sent);
}
NettyChannel.send
:
private final org.jboss.netty.channel.Channel channel; // NioClientSocketChannel实例
// @param message Request对象
// @param sent 是否等待请求发送完毕
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent); // 检查Channel是否已经关闭
boolean success = true;
int timeout = 0;
try {
ChannelFuture future = channel.write(message); // 异步请求
if (sent) { // sent=true,等待请求发送完成
// 超时
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout); // 等待写操作完成
}
Throwable cause = future.getCause(); // 若写操作失败,得到失败异常
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
+ ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
这里就执行到了netty
内部,通过netty
自己的NioClientSocketChannel
将消息发送给服务端。由于netty
的每个channel
都会有pipeline
,NettyClient pipeline
的ChannelHandler
有解码器、编码器及Nettyhandler
。因此这里首先会经过Nettyhandler
的处理(调用流程如文章前面所示):
// 写出请求
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// 将写出数据的事件往下传播,即将请求编码后发送到服务端(这里编码细节后续再讲)
super.writeRequested(ctx, e);
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// handler--NettyClient
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
请求异步发送到服务端后,接下来执行NettyClient.sent(channel, e.getMessage())
,实际上调用的是其父类AbstractPeer
的sent(Channel ch, Object msg)
方法:
public void sent(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
// handler--MultiMessageHandler
handler.sent(ch, msg);
}
继续调用MultiMessageHandler.sent(Channel channel, Object message)
方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--HeartBeatHandler
handler.sent(channel, message);
}
接着调用HeartBeatHandler.sent(Channel channel, Object message)
,实际上调用的也是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--AllChannelHandler
handler.sent(channel, message);
}
接着调用AllChannelHandler.sent(Channel channel, Object message)
方法,实际上调用的是父类WrappedChannelHandler.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--DecodeHandler实例
handler.sent(channel, message);
}
继续调用DecodeHandler.sent(Channel channel, Object message)
方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--HeaderExchangeHandler
handler.sent(channel, message);
}
继续调用HeaderExchangeHandler.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); // 设置写时间戳
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // NettyChannel绑定一个HeaderExchangeChannel
try {
handler.sent(exchangeChannel, message); // do nothing
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request); // 标记发送状态
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
HeaderExchangeHandler.sent(Channel channel, Object message)
方法中,调用了DubboProtocol
匿名内部类requestHandler
的sent(Channel channel, Object message)
,这里没有什么逻辑。接着,由于message
是Request
对象,因此会将当前请求对应的DefaultFuturet
的sent
变量赋值为当前时间,表示客户端已经将请求发送给服务端。
到此,客户端请求流程介绍完毕。此时,再来看DubboInvoker.doInvoker()
方法:
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation); // 方法名
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version
// 确定客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
if (isOneway) { // 无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) { // 异步有返回值
ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { // 同步有返回值
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待 ###
}
} catch (TimeoutException e) { // 超时异常
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) { // 网络异常
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
在“###”处,ccurrentClient.request(inv, timeout)
返回的是DefaultFuture
,即ResponseFuture
实现。此外,DefaultFuture.get()
的执行后续文章分析。