Dubbo 事件通知机制分析
Dubbo
事件通知机制介绍参考: 事件通知
一、实例
两个服务:
DemoService
:真正要调用的服务;Notify
:事件通知服务(用在Consumer
端)。
provider:
package com.alibaba.dubbo.demo;
public interface DemoService {
String sayHello(String name);
}
public class DemoServiceImpl implements DemoService {
@Override
public String sayHello(String name) {
// throw new RpcException("ex, param: " + name); // 测试onthrow方法
return "Hello " + name; // 测试onreturn方法
}
}
consumer:
通知服务:Notify
public interface Notify {
void oninvoke(String name); // 调用之前
void onreturnWithoutParam(String result); // 调用之后
void onreturn(String result, String name); // 调用之后
void onthrow(Throwable ex, String name); // 出现异常
}
public class NotifyService implements Notify {
@Override
public void oninvoke(String name) {
System.out.println("======oninvoke======, param: " + name);
}
@Override
public void onreturnWithoutParam(String result) {
System.out.println("======onreturnWithoutParam======, result: " + result);
}
@Override
public void onreturn(String result, String name) {
System.out.println("======onreturn======, param: " + name + ", result: " + result);
}
@Override
public void onthrow(Throwable ex, String name) {
System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage());
}
}
xml配置:
<bean id="notifyService" class="com.alibaba.dubbo.demo.consumer.eventnotify.NotifyService"/>
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" oninvoke="notifyService.oninvoke"
onreturn="notifyService.onreturnWithoutParam" onthrow="notifyService.onthrow"/>
</dubbo:reference>
之后就可以运行Consumer
启动类,再调用demoService.sayHello(String name)
。
注意:
- oninvoke方法:
- 必须具有与真实的被调用方法sayHello相同的入参列表:例如oninvoke(String name)
- onreturn方法:
- 至少要有一个入参且第一个入参必须与sayHello的返回类型相同,接收返回结果:例如onreturnWithoutParam(String result)
- 可以有多个参数,多个参数的情况下,第一个参数后边的所有参数都是用来接收sayHello入参的:例如 onreturn(String result, String name)
- onthrow方法:
- 至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果;例如onthrow(Throwable ex)
- 可以有多个参数,多个参数的情况下,第一个参数后边的所有参数都是用来接收sayHello入参的:例如onthrow(Throwable ex, String name)
- 如果Consumer在调用Provider的过程中,出现了异常,是不会走onthrow方法的,onthrow方法只会在Provider返回的RpcResult中含有Exception对象时,才会执行。(Dubbo中下层服务(服务提供方)的Exception会被放在响应RpcResult的exception变量中传递给上层服务(服务消费方))
二、源码分析
整个事件通知的逻辑都在FutureFilter
中,来看一下源码:
// 事件通知Filter,消费者端生效
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 是否异步执行
//1 调用服务之前:执行xxxService.oninvoke方法
fireInvokeCallback(invoker, invocation);
//2 调用服务
// 需要在调用前配置好是否有返回值(return参数),已供invoker判断是否需要返回future.
Result result = invoker.invoke(invocation);
//3 调用服务之后
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
}
//4 返回调用结果
return result;
}
// 同步回调
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
if (result.hasException()) {
//3.1 调用服务之后:如果返回结果存在异常信息(注意:如果是consumer自己throw的异常,会在2的时候直接抛走,不会走到这里),直接执行xxxService.onthrow方法
fireThrowCallback(invoker, invocation, result.getException());
} else {
//3.2 调用服务之后:如果返回值正常,执行xxxService.onreturn方法
fireReturnCallback(invoker, invocation, result.getValue());
}
}
// 异步回调
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> future = RpcContext.getContext().getFuture(); // null/FutureAdapter
if (f instanceof FutureAdapter) {
// 异步调用结果适配器
ResponseFuture responseFuture = ((FutureAdapter<?>) f).getFuture(); // 异步调用响应结果 DefaultFuture
// 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
future.setCallback(new ResponseCallback() { // 设置回调
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
// must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
// 返回结果存在异常信息
fireThrowCallback(invoker, invocation, result.getException());
} else {
// 返回结果正常
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
// 触发调用回调,反射执行xxxService.oninvoke方法:必须具有与真实的被调用方法sayHello相同的入参列表。
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
URL url = invoker.getUrl(); // 合并参数之后的提供者url
String methodName = invocation.getMethodName();
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); // 方法
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); // 对象
// 未设置oninvoke的回调
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
// 获取真实方法sayHello传入的参数
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params); // 调用方法
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
// 触发返回回调,反射执行xxxService.onreturn方法:至少要有一个入参,接收返回结果
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
URL url = invoker.getUrl(); // 合并参数之后的提供者url
String methodName = invocation.getMethodName();
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
//not set onreturn callback
if (onReturnMethod == null && onReturnInst == null) {
return;
}
if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (onReturnMethod != null && !onReturnMethod.isAccessible()) {
onReturnMethod.setAccessible(true);
}
Object[] args = invocation.getArguments(); // 调用方法的参数
Object[] params; // 回调方法的参数
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes(); // 回调方法的参数类型
if (rParaTypes.length > 1) {
// onreturn(xx, Object[]) 两个参数:第一个参数与真实方法sayHello方法返回结果类型相同,第二个接收所有的真实请求参数
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = result; // 真实方法的执行结果
params[1] = args; // 真实方法sayHello传入的参数
} else {
// onreturn(xx, Object... args) 多个参数:第一个参数与真实方法sayHello方法返回结果类型相同,后边几个接收所有的真实请求参数
params = new Object[args.length + 1];
params[0] = result; // 真实方法的执行结果
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
// onreturn(xx) 只有一个参数:接收返回执行结果
params = new Object[]{result}; // 执行结果
}
try {
onReturnMethod.invoke(onReturnInst, params); // 调用回调方法
} catch (InvocationTargetException e) {
fireThrowCallback(invoker, invocation, e.getTargetException());
} catch (Throwable e) {
fireThrowCallback(invoker, invocation, e);
}
}
// 触发抛出异常的回调,反射执行xxxService.onthrow方法:至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
URL url = invoker.getUrl(); // 合并消费者参数之后的提供者url
String methodName = invocation.getMethodName();
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
//onthrow callback not configured
if (onthrowMethod == null && onthrowInst == null) {
return;
}
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (onthrowMethod != null && !onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes(); // 参数类型
if (rParaTypes[0].isAssignableFrom(exception.getClass())) { // 第一个参数是异常参数
try {
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length > 1) {
// onthrow(xx, Object[]) 两个参数:第一个参数接收exception,第二个接收所有的真实请求参数
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
params[0] = exception;
params[1] = args;
} else {
// onthrow(xx, Object... args) 多个参数:第一个参数接收exception,后边几个接收所有的真实请求参数
params = new Object[args.length + 1];
params[0] = exception;
System.arraycopy(args, 0, params, 1, args.length);
}
} else {
// onthrow(xx) 只有一个参数:接收exception
params = new Object[]{exception};
}
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
}
从@Activate(group = Constants.CONSUMER)
来看,FutureFilter
只用在consumer
端;不管是同步调用还是异步调用,都会走FutureFilter
。
原理:
- 首先调用
oninvoke(String name)
; - 然后调用
sayHello(String name)
; - 最后根据同步还是异步分别走不同的逻辑。
其中同步很简单,看sayHello(String name)
的返回结果RpcResult
中是否有exception
对象,如果有,执行onthrow(Throwable ex, String name)
;如果没有,执行onreturnWithoutParam(String result)
。
异步的操作:由于不知道provider
什么时候会执行完毕,所以要添加回调等待provider
端返回结果后,再执行onthrow(Throwable ex, String name)
或者onreturnWithoutParam(String result)
,这种模式很重要,这是统计异步方法调用时间的一种非常好的模式。
下面重点看下异步回调。
三、异步回调模式
// 异步回调
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> future = RpcContext.getContext().getFuture(); // null/FutureAdapter
if (f instanceof FutureAdapter) {
// 异步调用结果适配器
ResponseFuture responseFuture = ((FutureAdapter<?>) f).getFuture(); // 异步调用响应结果 DefaultFuture
// 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
future.setCallback(new ResponseCallback() { // 设置回调
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
// must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
// 返回结果存在异常信息
fireThrowCallback(invoker, invocation, result.getException());
} else {
// 返回结果正常
fireReturnCallback(invoker, invocation, result.getValue());
}
}
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
上述的responseFuture
对象是DefaultFuture
,这里首先new了一个ResponseCallback
回调实例,设置到了DefaultFuture
的ResponseCallback callback
属性中。来看一下DefaultFuture
类:
private volatile Response response; // 响应
private volatile ResponseCallback callback; // 响应回调
// 响应是否已经到达消费者端
public boolean isDone() {
return response != null;
}
public void setCallback(ResponseCallback callback) {
if (isDone()) {
// 已经获取到响应,直接调用回调对象
invokeCallback(callback);
} else {
boolean isDone = false; // 是否获取到响应
lock.lock();
try {
if (!isDone()) {
this.callback = callback;
} else {
isDone = true;
}
} finally {
lock.unlock();
}
if (isDone) {
invokeCallback(callback);
}
}
}
// 响应回调
private void invokeCallback(ResponseCallback responseCallback) {
if (responseCallback == null) {
throw new NullPointerException("callback cannot be null.");
}
Response res = response; // 响应
if (res == null) {
throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
}
// 返回正常,回调ResponseCallback回调实例的done方法
if (res.getStatus() == Response.OK) {
try {
responseCallback.done(res.getResult());
} catch (Exception e) {
logger.error("callback invoke error. result:" + res.getResult() + ", url:" + channel.getUrl(), e);
}
} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
try {
TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
// 如果超时,回调ResponseCallback回调实例的caught方法
responseCallback.caught(te);
} catch (Exception e) {
logger.error("callback invoke error. url:" + channel.getUrl(), e);
}
} else {
try {
// 其他异常,回调ResponseCallback回调实例的caught方法
RuntimeException re = new RuntimeException(res.getErrorMessage());
responseCallback.caught(re);
} catch (Exception e) {
logger.error("callback invoke error. url:" + channel.getUrl(), e);
}
}
}
从setCallback(ResponseCallback callback)
可以看出,如果此时provider
端已经返回了响应(response!=null
),则直接执行ResponseCallback
回调实例中的done
方法或者caught
方法;否则,将上边创建的ResponseCallback
实例赋值给DefaultFuture
的ResponseCallback callback
属性。那么之后会在什么时候执行回调实例的方法呢?当consumer
接收到provider
响应的时候!
// 处理响应
// @param channel NettyChannel
// @param response Response
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId()); // 根据请求响应的id,找到对应的DefaultFuture
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
// 异步转同步
// @param res Response对象
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
// 触发回调
invokeCallback(callback);
}
}
当provider
返回响应时,会调用DefaultFuture.received(Channel channel, Response response)
方法(见Dubbo 客户端接收响应流程分析(异步转同步实现)),此时便会执行回调方法。到此,事件通知的原理分析完毕。最后看一个回调模式的使用场景:统计异步方法的调用时间。
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> f = RpcContext.getContext().getFuture();
final long start = System.currentTimeMillis();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
future.setCallback(new ResponseCallback() {
public void done(Object rpcResult) {
long cost = System.currentTimeMillis() - start;
}
});
}
}
上边的代码只是一个形式,实际上start
时间需要在调用sayHello
方法之前进行记录。