Dubbo 异步调用原理分析
Dubbo 客户端发起请求过程分析、Dubbo 服务端接收请求并发送响应流程分析、Dubbo 客户端接收响应流程分析(异步转同步实现)这三篇文章分析了Dubbo
同步调用的流程,下面来看Dubbo
异步调用的原理。
一、使用方式
服务提供方不变,消费方代码如下:
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService">
<dubbo:method name="sayHello" async="true"/>
<dubbo:method name="sayBye" async="true"/>
</dubbo:reference>
配置里添加<dubbo:method name="xxx" async="true"/>
,表示单个方法xxx使用异步方式;如果DemoService
下的所有方法都使用异步,直接配置为<dubbo:reference async="true"/>
。
public static void main(String[] args) throws Exception {
System.setProperty("java.net.preferIPv4Stack", "true");
// asyncFuture1();
asyncFuture2();
}
public static void asyncFuture1() throws ExecutionException, InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-withzk-consumer.xml"});
context.start();
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
long start = System.currentTimeMillis();
demoService.sayHello("zhangsan");
Future<String> helloFuture = RpcContext.getContext().getFuture();
demoService.sayBye("lisi");
Future<String> byeFuture = RpcContext.getContext().getFuture();
final String helloStr = helloFuture.get();//消耗5s
final String byeStr = byeFuture.get();//消耗8s
System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis() - start));//总消耗8s
}
public static void asyncFuture2() throws ExecutionException, InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-withzk-consumer.xml"});
context.start();
DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy
long start = System.currentTimeMillis();
Future<String> helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello("zhangsan"));
Future<String> byeFuture = RpcContext.getContext().asyncCall(()->demoService.sayBye("lisi"));
final String helloStr = helloFuture.get();//消耗5s
final String byeStr = byeFuture.get();//消耗8s
System.out.println(helloStr + " -- " + byeStr + " ,cost:" + (System.currentTimeMillis()-start));//总消耗8s
}
以上是Consumer
启动主类的代码。其中asyncFuture2()
方法是推荐用法,注意Callable
(asyncCall
方法的入参)只是一个任务task
,不会新建线程;所以asyncFuture2()
和asyncFuture1()
相似,资源占用相同,都是用一个线程进行异步操作的。
二、asyncFuture1分析
先来看asyncFuture1()
,总体步骤如下:
demoService.sayHello("zhangsan");
创建一个Future
对象(FutureAdapter
),存入当前线程的上下文中;Future<String> helloFuture = RpcContext.getContext().getFuture();
从当前线程的上下文中获取第一步存入的Future
对象;final String helloStr = helloFuture.get();
阻塞等待,从Future
中获取结果。
代码主要执行流程如下(代码详细执行流程参考文章开头的三篇文章):
1.demoService.sayHello(“zhangsan”);
-->FutureFilter.invoke(final Invoker<?> invoker, final Invocation invocation)
-->DubboInvoker.doInvoke(final Invocation invocation)
FutureFilter
:
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 是否异步执行
fireInvokeCallback(invoker, invocation);
//需要在调用前配置好是否有返回值(return参数),已供invoker判断是否需要返回future.
Result result = invoker.invoke(invocation); // 发起调用
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
}
return result;
}
对于如上的异步操作(asyncFuture1()
和asyncFuture2()
),FutureFilter
没起任何作用,该Filter
主要会用在事件通知中,后续再说。
DubboInvoker.doInvoke(final Invocation invocation)
:
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation); // 方法名
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version
// 确定客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
if (isOneway) { // 无论同步或异步,无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) { // 异步有返回值
ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { // 同步有返回值
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待
}
} catch (TimeoutException e) { // 超时异常
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) { // 网络异常
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
逻辑:
- 如果是
isOneway
(不需要返回值),不管同步还是异步,请求直接发出,不会创建Future
,直接返回RpcResult
空对象。 - 如果是
isAsync
(异步),则- 先创建
ResponseFuture
对象,之后使用FutureAdapter
包装该ResponseFuture
对象;(创建ResponseFuture
对象与同步的代码相同,最后得到的是一个DefaultFuture
对象); - 然后将该
FutureAdapter
对象设入当前线程的上下文中RpcContext.getContext();
- 最后返回空的
RpcResult
。
- 先创建
- 如果是同步,则先创建
ResponseFuture
对象,之后直接调用其get()
方法进行阻塞调用。
简单来看一下FutureAdapter
:
public class FutureAdapter<V> implements Future<V> {
// 异步调用响应结果 DefaultFuture
private final ResponseFuture future;
public FutureAdapter(ResponseFuture future) {
this.future = future;
}
public ResponseFuture getFuture() {
return future;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return future.isDone();
}
// 获取结果
@Override
@SuppressWarnings("unchecked")
public V get() throws InterruptedException, ExecutionException {
try {
return (V) (((Result) future.get()).recreate());
} catch (RemotingException e) {
throw new ExecutionException(e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e);
}
}
// 获取结果
@Override
@SuppressWarnings("unchecked")
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
int timeoutInMillis = (int) TimeUnit.MILLISECONDS.convert(timeout, unit);
try {
return (V) (((Result) future.get(timeoutInMillis)).recreate());
} catch (com.alibaba.dubbo.remoting.TimeoutException e) {
throw new TimeoutException(StringUtils.toString(e));
} catch (RemotingException e) {
throw new ExecutionException(e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e);
}
}
}
最后,回头看一下FutureFilter
:
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // 是否异步执行
fireInvokeCallback(invoker, invocation);
//需要在调用前配置好是否有返回值(return参数),已供invoker判断是否需要返回future.
Result result = invoker.invoke(invocation); // 发起调用
if (isAsync) {
asyncCallback(invoker, invocation);
} else {
syncCallback(invoker, invocation, result);
}
return result;
}
// 异步回调。
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
Future<?> future = RpcContext.getContext().getFuture(); // null/FutureAdapter
if (future instanceof FutureAdapter) {
// 异步调用结果适配器
ResponseFuture responseFuture = ((FutureAdapter<?>) future).getFuture(); // 异步调用响应结果 DefaultFuture
responseFuture.setCallback(new ResponseCallback() { // 设置回调
// 响应结果回调
@Override
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value: null, expected "
+ Result.class.getName()));
return;
}
// must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type: " + rpcResult.getClass()
+ ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
// 异步调用发生异常
fireThrowCallback(invoker, invocation, result.getException());
} else {
// 返回时的回调
fireReturnCallback(invoker, invocation, result.getValue());
}
}
@Override
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
});
}
}
这里的future
对象是之前创建好的FutureAdapter
对象,responseFuture
是FutureAdapter
包含的DefaultFuture
对象。DefaultFuture
代码片段如下:
private volatile Response response; // 响应
private volatile ResponseCallback callback; // 响应回调
// 响应是否已经到达消费者端
public boolean isDone() {
return response != null;
}
public void setCallback(ResponseCallback callback) {
if (isDone()) {
// 已经获取到响应,直接调用回调对象
invokeCallback(callback);
} else {
boolean isDone = false; // 是否获取到响应
lock.lock();
try {
if (!isDone()) {
this.callback = callback;
} else {
isDone = true;
}
} finally {
lock.unlock();
}
if (isDone) {
invokeCallback(callback);
}
}
}
这里判断响应是否已经返回,如果返回了,直接执行invokeCallback(callback)
,否则将传入的ResponseCallback
对象赋值给callback
变量。
2.Future<String>
helloFuture = RpcContext.getContext().getFuture();
RpcContext
:
private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
@Override
protected RpcContext initialValue() {
return new RpcContext();
}
};
private Future<?> future;
public static RpcContext getContext() {
return LOCAL.get();
}
public <T> Future<T> getFuture() {
return (Future<T>) future;
}
从当前线程上下文中获取之前存进去的FutureAdapter
对象。
3.final String helloStr = helloFuture.get();
helloFuture
是上述的FutureAdapter
对象,其get()
调用的是内部的DefaultFuture
的get()
,该方法与同步调用时相同,源码分析见文章开头的三篇文章。
public V get() throws InterruptedException, ExecutionException {
try {
return (V) (((Result) future.get()).recreate());
} catch (RemotingException e) {
throw new ExecutionException(e.getMessage(), e);
} catch (Throwable e) {
throw new RpcException(e);
}
}
get
方法的超时设置除了直接在xml中配置之外,还可以在代码中手动执行(优先级高)。
final String helloStr2 = helloFuture.get(3000, TimeUnit.MILLISECONDS);
三、asyncFuture2()分析
下面来看一下asyncFuture2()
的源码:
1.Future<String>
helloFuture = RpcContext.getContext().asyncCall(()-> demoService.sayHello(“zhangsan”));
public <T> Future<T> asyncCall(Callable<T> callable) {
try {
try {
// Java代码设置异步async=true
setAttachment(Constants.ASYNC_KEY, Boolean.toString(true));
// 执行传入的任务(此处创建FutureAdapter对象,并且设置到当前线程的RpcContext的future变量中)
final T result = callable.call(); // RPC异步调用,返回null
// local调用会直接返回结果.
if (result != null) {
FutureTask<T> futureTask = new FutureTask<T>(new Callable<T>() {
@Override
public T call() throws Exception {
return result;
}
});
futureTask.run();
return futureTask;
}
} catch (Exception e) {
throw new RpcException(e);
} finally {
// 移除async key
removeAttachment(Constants.ASYNC_KEY);
}
} catch (final RpcException e) {
return new Future<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
@Override
public T get() throws InterruptedException, ExecutionException {
throw new ExecutionException(e.getCause());
}
@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return get();
}
};
}
return ((Future<T>) getContext().getFuture()); // 从当前线程的RpcContext中获取FutureAdapter对象
}
2.final String helloStr = helloFuture.get();
与同步调用时相同。
四、总结
Dubbo
异步与同步调用的差别:
- 同步:创建
DefaultFuture
之后,直接get
阻塞等待; - 异步:创建
DefaultFuture
之后,使用FutureAdapter
进行包装,之后设置到当前线程的RpcContext
中;后续用户在合适的时候自己从RpcContext
获取future
,之后get
。