Dubbo 异步调用原理分析

Async Call

Posted by Jay on March 18, 2019

Dubbo 异步调用原理分析

Dubbo 客户端发起请求过程分析Dubbo 服务端接收请求并发送响应流程分析Dubbo 客户端接收响应流程分析(异步转同步实现)这三篇文章分析了Dubbo同步调用的流程,下面来看Dubbo异步调用的原理。

一、使用方式

服务提供方不变,消费方代码如下:

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
  	<dubbo:method name="sayHello" async="true"/>
  	<dubbo:method name="sayBye" async="true"/>
</dubbo:reference>

配置里添加<dubbo:method name="xxx" async="true"/>,表示单个方法xxx使用异步方式;如果DemoService下的所有方法都使用异步,直接配置为<dubbo:reference async="true"/>

public static void main(String[] args) throws Exception {
  	System.setProperty("java.net.preferIPv4Stack", "true");
//  asyncFuture1();
  	asyncFuture2();
}

public static void asyncFuture1() throws ExecutionException, InterruptedException {
  	ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-withzk-consumer.xml"});
  	context.start();
  	DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

  	long start = System.currentTimeMillis();

  	demoService.sayHello("zhangsan");
  	Future<String> helloFuture = RpcContext.getContext().getFuture();

  	demoService.sayBye("lisi");
  	Future<String> byeFuture = RpcContext.getContext().getFuture();

  	final String helloStr = helloFuture.get();//消耗5s
  	final String byeStr = byeFuture.get();//消耗8s

  	System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis() - start));//总消耗8s
}
public static void asyncFuture2() throws ExecutionException, InterruptedException {
  	ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-withzk-consumer.xml"});
  	context.start();
  	DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

  	long start = System.currentTimeMillis();

  	Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
  	Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));

  	final String helloStr = helloFuture.get();//消耗5s
  	final String byeStr = byeFuture.get();//消耗8s

  	System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
}

以上是Consumer启动主类的代码。其中asyncFuture2()方法是推荐用法,注意Callable(asyncCall方法的入参)只是一个任务task,不会新建线程;所以asyncFuture2()asyncFuture1()相似,资源占用相同,都是用一个线程进行异步操作的。

二、asyncFuture1分析

先来看asyncFuture1(),总体步骤如下:

  • demoService.sayHello("zhangsan"); 创建一个Future对象(FutureAdapter),存入当前线程的上下文中;
  • Future<String> helloFuture = RpcContext.getContext().getFuture(); 从当前线程的上下文中获取第一步存入的Future对象;
  • final String helloStr = helloFuture.get(); 阻塞等待,从Future中获取结果。

代码主要执行流程如下(代码详细执行流程参考文章开头的三篇文章):

1.demoService.sayHello(“zhangsan”);
-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
   -->DubboInvoker.doInvoke(final Invocation invocation)

FutureFilter:

public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
  	final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 是否异步执行

  	fireInvokeCallback(invoker, invocation);
  	//需要在调用前配置好是否有返回值(return参数),已供invoker判断是否需要返回future. 
  	Result result = invoker.invoke(invocation); // 发起调用
  	if (isAsync) {
    	asyncCallback(invoker, invocation);
  	} else {
    	syncCallback(invoker, invocation, result);
  	}
  	return result;
}

对于如上的异步操作(asyncFuture1()asyncFuture2()),FutureFilter没起任何作用,该Filter主要会用在事件通知中,后续再说。

DubboInvoker.doInvoke(final Invocation invocation)

protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation); // 方法名
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
    inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version

    // 确定客户端
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步 
        boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
        if (isOneway) { // 无论同步或异步,无返回值
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) { // 异步有返回值
            ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else { // 同步有返回值
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待
        }
    } catch (TimeoutException e) { // 超时异常
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) { // 网络异常
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

逻辑:

  • 如果是isOneway(不需要返回值),不管同步还是异步,请求直接发出,不会创建Future,直接返回RpcResult空对象。
  • 如果是isAsync(异步),则
    • 先创建ResponseFuture对象,之后使用FutureAdapter包装该ResponseFuture对象;(创建ResponseFuture对象与同步的代码相同,最后得到的是一个DefaultFuture对象);
    • 然后将该FutureAdapter对象设入当前线程的上下文中RpcContext.getContext();
    • 最后返回空的RpcResult
  • 如果是同步,则先创建ResponseFuture对象,之后直接调用其get()方法进行阻塞调用。

简单来看一下FutureAdapter

public class FutureAdapter<V> implements Future<V> {

    // 异步调用响应结果 DefaultFuture
    private final ResponseFuture future;

    public FutureAdapter(ResponseFuture future) {
        this.future = future;
    }

    public ResponseFuture getFuture() {
        return future;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return future.isDone();
    }

  	// 获取结果
    @Override
    @SuppressWarnings("unchecked")
    public V get() throws InterruptedException, ExecutionException {
        try {
            return (V) (((Result) future.get()).recreate());
        } catch (RemotingException e) {
            throw new ExecutionException(e.getMessage(), e);
        } catch (Throwable e) {
            throw new RpcException(e);
        }
    }

  	// 获取结果
    @Override
    @SuppressWarnings("unchecked")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        int timeoutInMillis = (int) TimeUnit.MILLISECONDS.convert(timeout, unit);
        try {
            return (V) (((Result) future.get(timeoutInMillis)).recreate());
        } catch (com.alibaba.dubbo.remoting.TimeoutException e) {
            throw new TimeoutException(StringUtils.toString(e));
        } catch (RemotingException e) {
            throw new ExecutionException(e.getMessage(), e);
        } catch (Throwable e) {
            throw new RpcException(e);
        }
    }

}

最后,回头看一下FutureFilter

public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
  	final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 是否异步执行

  	fireInvokeCallback(invoker, invocation);
  	//需要在调用前配置好是否有返回值(return参数),已供invoker判断是否需要返回future. 
  	Result result = invoker.invoke(invocation); // 发起调用
  	if (isAsync) {
    	asyncCallback(invoker, invocation);
  	} else {
      syncCallback(invoker, invocation, result);
  	}
  	return result;
}
// 异步回调。
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
    Future<?> future = RpcContext.getContext().getFuture(); // null/FutureAdapter
    if (future instanceof FutureAdapter) {
        // 异步调用结果适配器
        ResponseFuture responseFuture = ((FutureAdapter<?>) future).getFuture(); // 异步调用响应结果 DefaultFuture
        responseFuture.setCallback(new ResponseCallback() { // 设置回调
            // 响应结果回调
            @Override
            public void done(Object rpcResult) {
                if (rpcResult == null) {
                    logger.error(new IllegalStateException("invalid result value: null, expected "
                            + Result.class.getName()));
                    return;
                }
                // must be rpcResult
                if (!(rpcResult instanceof Result)) {
                    logger.error(new IllegalStateException("invalid result type: " + rpcResult.getClass()
                            + ", expected " + Result.class.getName()));
                    return;
                }
                Result result = (Result) rpcResult;
                if (result.hasException()) {
                    // 异步调用发生异常
                    fireThrowCallback(invoker, invocation, result.getException());
                } else {
                    // 返回时的回调
                    fireReturnCallback(invoker, invocation, result.getValue());
                }
            }

            @Override
            public void caught(Throwable exception) {
                fireThrowCallback(invoker, invocation, exception);
            }
        });
    }
}

这里的future对象是之前创建好的FutureAdapter对象,responseFutureFutureAdapter包含的DefaultFuture对象。DefaultFuture代码片段如下:

private volatile Response response; // 响应
private volatile ResponseCallback callback; // 响应回调

// 响应是否已经到达消费者端
public boolean isDone() {
  	return response != null;
}

public void setCallback(ResponseCallback callback) {
  	if (isDone()) {
    	// 已经获取到响应,直接调用回调对象
    	invokeCallback(callback);
  	} else {
    	boolean isDone = false; // 是否获取到响应
    	lock.lock();
    	try {
      	if (!isDone()) {
        	this.callback = callback;
      	} else {
        	isDone = true;
      	}
    	} finally {
      	lock.unlock();
    	}
    	if (isDone) {
      	invokeCallback(callback);
    	}
  	}
}

这里判断响应是否已经返回,如果返回了,直接执行invokeCallback(callback),否则将传入的ResponseCallback对象赋值给callback变量。

2.Future<String> helloFuture = RpcContext.getContext().getFuture();

RpcContext

private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
    @Override
    protected RpcContext initialValue() {
        return new RpcContext();
    }
};

private Future<?> future;

public static RpcContext getContext() {
    return LOCAL.get();
}

public <T> Future<T> getFuture() {
    return (Future<T>) future;
}

从当前线程上下文中获取之前存进去的FutureAdapter对象。

3.final String helloStr = helloFuture.get();

helloFuture是上述的FutureAdapter对象,其get()调用的是内部的DefaultFutureget(),该方法与同步调用时相同,源码分析见文章开头的三篇文章。

public V get() throws InterruptedException, ExecutionException {
    try {
        return (V) (((Result) future.get()).recreate());
    } catch (RemotingException e) {
        throw new ExecutionException(e.getMessage(), e);
    } catch (Throwable e) {
        throw new RpcException(e);
    }
}

get方法的超时设置除了直接在xml中配置之外,还可以在代码中手动执行(优先级高)。

final String helloStr2 = helloFuture.get(3000, TimeUnit.MILLISECONDS);

三、asyncFuture2()分析

下面来看一下asyncFuture2()的源码:

1.Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello(“zhangsan”));
public <T> Future<T> asyncCall(Callable<T> callable) {
    try {
        try {
            // Java代码设置异步async=true
            setAttachment(Constants.ASYNC_KEY, Boolean.toString(true));
            // 执行传入的任务(此处创建FutureAdapter对象,并且设置到当前线程的RpcContext的future变量中)
            final T result = callable.call(); // RPC异步调用,返回null
            // local调用会直接返回结果.
            if (result != null) {
                FutureTask<T> futureTask = new FutureTask<T>(new Callable<T>() {
                    @Override
                    public T call() throws Exception {
                        return result;
                    }
                });
                futureTask.run();
                return futureTask;
            }
        } catch (Exception e) {
            throw new RpcException(e);
        } finally {
            // 移除async key
            removeAttachment(Constants.ASYNC_KEY);
        }
    } catch (final RpcException e) {
        return new Future<T>() {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return true;
            }

            @Override
            public T get() throws InterruptedException, ExecutionException {
                throw new ExecutionException(e.getCause());
            }

            @Override
            public T get(long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException {
                return get();
            }
        };
    }
    return ((Future<T>) getContext().getFuture()); // 从当前线程的RpcContext中获取FutureAdapter对象
}
2.final String helloStr = helloFuture.get();

与同步调用时相同。

四、总结

Dubbo异步与同步调用的差别:

  • 同步:创建DefaultFuture之后,直接get阻塞等待;
  • 异步:创建DefaultFuture之后,使用FutureAdapter进行包装,之后设置到当前线程的RpcContext中;后续用户在合适的时候自己从RpcContext获取future,之后get