Dubbo 事件通知机制分析

Callback

Posted by Jay on March 18, 2019

Dubbo 事件通知机制分析

Dubbo事件通知机制介绍参考: 事件通知

一、实例

两个服务:

  • DemoService:真正要调用的服务;
  • Notify:事件通知服务(用在Consumer端)。

provider

package com.alibaba.dubbo.demo;

public interface DemoService {
	String sayHello(String name);
}

public class DemoServiceImpl implements DemoService {
    @Override
    public String sayHello(String name) {
        // throw new RpcException("ex, param: " + name); // 测试onthrow方法
				return "Hello " + name; // 测试onreturn方法
    }
}

consumer

通知服务:Notify

public interface Notify {
    void oninvoke(String name); // 调用之前

    void onreturnWithoutParam(String result); // 调用之后

    void onreturn(String result, String name); // 调用之后

    void onthrow(Throwable ex, String name);  // 出现异常
}
public class NotifyService implements Notify {
    @Override
    public void oninvoke(String name) {
        System.out.println("======oninvoke======, param: " + name);
    }

    @Override
    public void onreturnWithoutParam(String result) {
        System.out.println("======onreturnWithoutParam======, result: " + result);
    }

    @Override
    public void onreturn(String result, String name) {
        System.out.println("======onreturn======, param: " + name + ", result: " + result);
    }

    @Override
    public void onthrow(Throwable ex, String name) {
        System.out.println("======onthrow======, param: " + name + ", exception: " + ex.getMessage());
    }
}

xml配置:

<bean id="notifyService" class="com.alibaba.dubbo.demo.consumer.eventnotify.NotifyService"/>
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
    <dubbo:method name="sayHello" oninvoke="notifyService.oninvoke"
                  onreturn="notifyService.onreturnWithoutParam" onthrow="notifyService.onthrow"/>
</dubbo:reference>

之后就可以运行Consumer启动类,再调用demoService.sayHello(String name)

注意:

  • oninvoke方法:
    • 必须具有与真实的被调用方法sayHello相同的入参列表:例如oninvoke(String name)
  • onreturn方法:
    • 至少要有一个入参且第一个入参必须与sayHello的返回类型相同,接收返回结果:例如onreturnWithoutParam(String result)
    • 可以有多个参数,多个参数的情况下,第一个参数后边的所有参数都是用来接收sayHello入参的:例如 onreturn(String result, String name)
  • onthrow方法:
    • 至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果;例如onthrow(Throwable ex)
    • 可以有多个参数,多个参数的情况下,第一个参数后边的所有参数都是用来接收sayHello入参的:例如onthrow(Throwable ex, String name)
  • 如果Consumer在调用Provider的过程中,出现了异常,是不会走onthrow方法的,onthrow方法只会在Provider返回的RpcResult中含有Exception对象时,才会执行。(Dubbo中下层服务(服务提供方)的Exception会被放在响应RpcResult的exception变量中传递给上层服务(服务消费方))

二、源码分析

整个事件通知的逻辑都在FutureFilter中,来看一下源码:

// 事件通知Filter,消费者端生效
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {

    protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 是否异步执行

        //1 调用服务之前:执行xxxService.oninvoke方法
        fireInvokeCallback(invoker, invocation);
        //2 调用服务
        // 需要在调用前配置好是否有返回值(return参数),已供invoker判断是否需要返回future.
        Result result = invoker.invoke(invocation);
        //3 调用服务之后
        if (isAsync) {
            asyncCallback(invoker, invocation);
        } else {
            syncCallback(invoker, invocation, result);
        }
        //4 返回调用结果
        return result;
    }

    // 同步回调
    private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
        if (result.hasException()) {
            //3.1 调用服务之后:如果返回结果存在异常信息(注意:如果是consumer自己throw的异常,会在2的时候直接抛走,不会走到这里),直接执行xxxService.onthrow方法
            fireThrowCallback(invoker, invocation, result.getException());
        } else {
            //3.2 调用服务之后:如果返回值正常,执行xxxService.onreturn方法
            fireReturnCallback(invoker, invocation, result.getValue());
        }
    }

    // 异步回调
    private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
        Future<?> future = RpcContext.getContext().getFuture(); // null/FutureAdapter
        if (f instanceof FutureAdapter) {
        	// 异步调用结果适配器
            ResponseFuture responseFuture = ((FutureAdapter<?>) f).getFuture(); // 异步调用响应结果 DefaultFuture
            // 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
            future.setCallback(new ResponseCallback() { // 设置回调
                public void done(Object rpcResult) {
                    if (rpcResult == null) {
                        logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                        return;
                    }
                    // must be rpcResult
                    if (!(rpcResult instanceof Result)) {
                        logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                        return;
                    }
                    Result result = (Result) rpcResult;
                    if (result.hasException()) {
                    	// 返回结果存在异常信息
                        fireThrowCallback(invoker, invocation, result.getException());
                    } else {
                    	// 返回结果正常
                        fireReturnCallback(invoker, invocation, result.getValue());
                    }
                }

                public void caught(Throwable exception) {
                    fireThrowCallback(invoker, invocation, exception);
                }
            });
        }
    }

    // 触发调用回调,反射执行xxxService.oninvoke方法:必须具有与真实的被调用方法sayHello相同的入参列表。
    private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
    		URL url = invoker.getUrl(); // 合并参数之后的提供者url
        String methodName = invocation.getMethodName();
        final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); // 方法
        final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); // 对象
        // 未设置oninvoke的回调
        if (onInvokeMethod == null && onInvokeInst == null) {
            return;
        }
        if (onInvokeMethod == null || onInvokeInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }
        // 获取真实方法sayHello传入的参数
        Object[] params = invocation.getArguments();
        try {
            onInvokeMethod.invoke(onInvokeInst, params); // 调用方法
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

    // 触发返回回调,反射执行xxxService.onreturn方法:至少要有一个入参,接收返回结果
    private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
    		URL url = invoker.getUrl(); // 合并参数之后的提供者url
        String methodName = invocation.getMethodName();
        final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
        final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));

        //not set onreturn callback
        if (onReturnMethod == null && onReturnInst == null) {
            return;
        }

        if (onReturnMethod == null || onReturnInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (onReturnMethod != null && !onReturnMethod.isAccessible()) {
            onReturnMethod.setAccessible(true);
        }

        Object[] args = invocation.getArguments(); // 调用方法的参数
        Object[] params; // 回调方法的参数
        Class<?>[] rParaTypes = onReturnMethod.getParameterTypes(); // 回调方法的参数类型
        if (rParaTypes.length > 1) {
            // onreturn(xx, Object[]) 两个参数:第一个参数与真实方法sayHello方法返回结果类型相同,第二个接收所有的真实请求参数
            if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                params = new Object[2];
                params[0] = result; // 真实方法的执行结果
                params[1] = args;   // 真实方法sayHello传入的参数
            } else {
              	// onreturn(xx, Object... args) 多个参数:第一个参数与真实方法sayHello方法返回结果类型相同,后边几个接收所有的真实请求参数
                params = new Object[args.length + 1];
                params[0] = result; // 真实方法的执行结果
                System.arraycopy(args, 0, params, 1, args.length);
            }
        } else {
            // onreturn(xx) 只有一个参数:接收返回执行结果
            params = new Object[]{result}; // 执行结果
        }
        try {
            onReturnMethod.invoke(onReturnInst, params); // 调用回调方法
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);
        }
    }

    // 触发抛出异常的回调,反射执行xxxService.onthrow方法:至少要有一个入参且第一个入参类型为Throwable或其子类,接收返回结果
    private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
    	URL url = invoker.getUrl(); // 合并消费者参数之后的提供者url
        String methodName = invocation.getMethodName();
        final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
        final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));

        //onthrow callback not configured
        if (onthrowMethod == null && onthrowInst == null) {
            return;
        }
        if (onthrowMethod == null || onthrowInst == null) {
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
        }
        if (onthrowMethod != null && !onthrowMethod.isAccessible()) {
            onthrowMethod.setAccessible(true);
        }
        Class<?>[] rParaTypes = onthrowMethod.getParameterTypes(); // 参数类型
        if (rParaTypes[0].isAssignableFrom(exception.getClass())) { // 第一个参数是异常参数
            try {
                Object[] args = invocation.getArguments();
                Object[] params;

                if (rParaTypes.length > 1) {
                    // onthrow(xx, Object[]) 两个参数:第一个参数接收exception,第二个接收所有的真实请求参数
                    if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
                        params = new Object[2];
                        params[0] = exception;
                        params[1] = args;
                    } else {
                      	// onthrow(xx, Object... args) 多个参数:第一个参数接收exception,后边几个接收所有的真实请求参数
                        params = new Object[args.length + 1];
                        params[0] = exception;
                        System.arraycopy(args, 0, params, 1, args.length);
                    }
                } else {
                    // onthrow(xx) 只有一个参数:接收exception
                    params = new Object[]{exception};
                }
                onthrowMethod.invoke(onthrowInst, params);
            } catch (Throwable e) {
                logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
            }
        } else {
            logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
        }
    }
}

@Activate(group = Constants.CONSUMER)来看,FutureFilter只用在consumer端;不管是同步调用还是异步调用,都会走FutureFilter

原理:

  • 首先调用oninvoke(String name)
  • 然后调用sayHello(String name)
  • 最后根据同步还是异步分别走不同的逻辑。

其中同步很简单,看sayHello(String name)的返回结果RpcResult中是否有exception对象,如果有,执行onthrow(Throwable ex, String name);如果没有,执行onreturnWithoutParam(String result)

异步的操作:由于不知道provider什么时候会执行完毕,所以要添加回调等待provider端返回结果后,再执行onthrow(Throwable ex, String name)或者onreturnWithoutParam(String result),这种模式很重要,这是统计异步方法调用时间的一种非常好的模式

下面重点看下异步回调。

三、异步回调模式

// 异步回调
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
    Future<?> future = RpcContext.getContext().getFuture(); // null/FutureAdapter
    if (f instanceof FutureAdapter) {
    	// 异步调用结果适配器
        ResponseFuture responseFuture = ((FutureAdapter<?>) f).getFuture(); // 异步调用响应结果 DefaultFuture
        // 3.1 调用服务之后:设置回调ResponseCallback对象到DefaultFuture中,当provider返回响应时,执行DefaultFuture.doReceived方法,该方法会调用ResponseCallback对象的done或者caught方法
        future.setCallback(new ResponseCallback() { // 设置回调
            public void done(Object rpcResult) {
                if (rpcResult == null) {
                    logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
                    return;
                }
                // must be rpcResult
                if (!(rpcResult instanceof Result)) {
                    logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
                    return;
                }
                Result result = (Result) rpcResult;
                if (result.hasException()) {
                	// 返回结果存在异常信息
                    fireThrowCallback(invoker, invocation, result.getException());
                } else {
                	// 返回结果正常
                    fireReturnCallback(invoker, invocation, result.getValue());
                }
            }

            public void caught(Throwable exception) {
                fireThrowCallback(invoker, invocation, exception);
            }
        });
    }
}

上述的responseFuture对象是DefaultFuture,这里首先new了一个ResponseCallback回调实例,设置到了DefaultFutureResponseCallback callback属性中。来看一下DefaultFuture类:

private volatile Response response; // 响应
private volatile ResponseCallback callback; // 响应回调

// 响应是否已经到达消费者端
public boolean isDone() {
  	return response != null;
}

public void setCallback(ResponseCallback callback) {
  	if (isDone()) {
    	// 已经获取到响应,直接调用回调对象
    	invokeCallback(callback);
  	} else {
    	boolean isDone = false; // 是否获取到响应
    	lock.lock();
    	try {
      	if (!isDone()) {
        	this.callback = callback;
      	} else {
        	isDone = true;
      	}
    	} finally {
      	lock.unlock();
    	}
    	if (isDone) {
      	invokeCallback(callback);
    	}
  	}
}
// 响应回调
private void invokeCallback(ResponseCallback responseCallback) {
    if (responseCallback == null) {
        throw new NullPointerException("callback cannot be null.");
    }
    Response res = response; // 响应
    if (res == null) {
        throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());
    }
	
  	// 返回正常,回调ResponseCallback回调实例的done方法
    if (res.getStatus() == Response.OK) {
        try {
            responseCallback.done(res.getResult());
        } catch (Exception e) {
            logger.error("callback invoke error. result:" + res.getResult() + ", url:" + channel.getUrl(), e);
        }
    } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        try {
            TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
	          // 如果超时,回调ResponseCallback回调实例的caught方法
            responseCallback.caught(te);
        } catch (Exception e) {
            logger.error("callback invoke error. url:" + channel.getUrl(), e);
        }
    } else {
        try {
          	// 其他异常,回调ResponseCallback回调实例的caught方法
            RuntimeException re = new RuntimeException(res.getErrorMessage());
            responseCallback.caught(re);
        } catch (Exception e) {
            logger.error("callback invoke error. url:" + channel.getUrl(), e);
        }
    }
}

setCallback(ResponseCallback callback)可以看出,如果此时provider端已经返回了响应(response!=null),则直接执行ResponseCallback回调实例中的done方法或者caught方法;否则,将上边创建的ResponseCallback实例赋值给DefaultFutureResponseCallback callback属性。那么之后会在什么时候执行回调实例的方法呢?当consumer接收到provider响应的时候!

// 处理响应
// @param channel NettyChannel
// @param response Response
public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId()); // 根据请求响应的id,找到对应的DefaultFuture
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}
// 异步转同步
// @param res Response对象
private void doReceived(Response res) {
    lock.lock();
    try {
        response = res;
        if (done != null) {
            done.signal();
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        // 触发回调
        invokeCallback(callback);
    }
}

provider返回响应时,会调用DefaultFuture.received(Channel channel, Response response)方法(见Dubbo 客户端接收响应流程分析(异步转同步实现)),此时便会执行回调方法。到此,事件通知的原理分析完毕。最后看一个回调模式的使用场景:统计异步方法的调用时间。

private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
    Future<?> f = RpcContext.getContext().getFuture();
    final long start = System.currentTimeMillis();
    if (f instanceof FutureAdapter) {
        ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
        future.setCallback(new ResponseCallback() {
            public void done(Object rpcResult) {
                long cost = System.currentTimeMillis() - start;
            }
        });
    }
}

上边的代码只是一个形式,实际上start时间需要在调用sayHello方法之前进行记录。