Dubbo 客户端接收响应流程分析(异步转同步实现)
一、总体流程
// 客户端接收响应
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->NettyClient.received(NettyChannel, Response)
-->AbstractPeer.received(NettyChannel, Response)
-->MultiMessageHandler.received(NettyChannel, Response)
-->HeartbeatHandler.received(NettyChannel, Response)
-->AllChannelHandler.received(NettyChannel, Response)
-->ExecutorService cexecutor = getExecutorService() // 消费者CachedThreadPool
-->executor.execute(new ChannelEventRunnable(NettyChannel, DecodeHandler, ChannelState.RECEIVED, Response))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(NettyChannel,Response)
-->decode(RpcResult)
-->HeaderExchangeHandler.received(NettyChannel, Response)
-->handleResponse(NettyChannel, Response)
-->DefaultFuture.received(NettyChannel, Response)
-->doReceived(Response) // 异步转同步
二、源码分析
在HeaderExchangeHandler.received(Channel channel, Object message)
方法之前,客户端接收响应Response
与服务端接收请求Request
一样,不再分析。
HeaderExchangeHandler.received(Channel channel, Object message)
:
// 收到Response
// @param channel NettyChannel.
// @param message Response
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 设置读时间戳
// 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// ...
} else if (message instanceof Response) {
// 处理响应--消费者端
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
private static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) { // 非心跳响应
DefaultFuture.received(channel, response);
}
}
DefaultFuture.received(Channel channel, Response response
:
private final long id; // invoke id
private final Request request;
private final int timeout;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private volatile Response response; // 响应
private volatile ResponseCallback callback;
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); // <invokeId, 异步调用计算结果>
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); // <invokeId, Dubbo数据通道>
// 处理响应
// @param channel NettyChannel
// @param response Response
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId()); // 根据请求响应的id,找到对应的DefaultFuture
if (future != null) {
future.doReceived(response); // 异步转同步
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
// 异步转同步
private void doReceived(Response res) {
lock.lock();
try {
response = res; // 设置response
if (done != null) {
done.signal(); // 唤醒阻塞的线程
}
} finally {
lock.unlock();
}
if (callback != null) {
// 触发回调
invokeCallback(callback);
}
}
这里比较难懂,再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout)
:
// 客户端发送请求,request RpcInvocation对象,timeout 超时时间
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request
+ ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0"); // Dubbo 协议版本
req.setTwoWay(true); // 是否双向,有返回值
req.setData(request); // 设置Data为Invocation
// <NettyClient, Request, timeout>
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
netty
是一个异步非阻塞的框架,所以当执行channel.send(req);
的时候,当其内部执行到netty
发送请求消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture
这个辅助类。
在HeaderExchangeChannel.request(Object request, int timeout)
中,在还没有等到客户端的响应回来的时候,就直接将future
返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)
的调用者。
-->DubboInvoker.doInvoke(final Invocation invocation)
//获取ExchangeClient进行消息的发送
-->ReferenceCountExchangeClient.request(Object request, int timeout),request--RpcInvocation
-->HeaderExchangeClient.request(Object request, int timeout)
-->HeaderExchangeChannel.request(Object request, int timeout)
DubboInvoker.doInvoke(final Invocation invocation)
:
// 实际调用过程
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation); // 方法名
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version
// 确定客户端
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
if (isOneway) { // 无返回值
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) { // 异步有返回值
ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { // 同步有返回值
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待
}
} catch (TimeoutException e) { // 超时异常
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) { // 网络异常
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
+ invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
其中currentClient.request(inv, timeout)
返回值是ResponseFuture
,DefaultFuture
是ResponseFuture
的实现类,实际上这里返回的就是DefaultFuture
实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)
返回的那个future实
例。之后调用DefaultFuture.get()
。
public Object get() throws RemotingException {
return get(timeout);
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) { // 未得到响应结果
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break; // 已完成或超时,退出
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
// sent > 0,说明客户端已经发送出去请求,是服务端的问题
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
此处可知当响应response
没有回来时,condition
会执行await
进行阻塞当前线程,直到被唤醒或被中断或阻塞时间耗尽。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived
方法会先为response
赋上返回值,之后执行condition
的signal
唤醒被阻塞的线程,get()
方法就会继续执行,执行returnFromResponse()
并返回结果。
// 获取response结果,返回
private Object returnFromResponse() throws RemotingException {
Response res = response;
if (res == null) {
throw new IllegalStateException("response cannot be null");
}
if (res.getStatus() == Response.OK) {
return res.getResult(); // 获取结果
}
if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
}
throw new RemotingException(channel, res.getErrorMessage());
}
到现在其实还有一个问题,就是netty
是异步非阻塞的,那么假设现在发了1w个Request
,后来返回来1w个Response
,那么怎么对应Request
和Response
呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,Request
和Response
中都有一个属性id。
在HeaderExchangeChannel.request(Object request, int timeout)
中:
// create request.
Request req = new Request();
req.setVersion("2.0.0"); // Dubbo 协议版本
req.setTwoWay(true); // 是否双向,有返回值
req.setData(request); // 设置Data为Invocation
// <NettyClient, Request, timeout>
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
看一下Request
的构造器:
private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId; // invoke id,m表示消息message
public Request() {
mId = newId();
}
private static long newId() {
// getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
return INVOKE_ID.getAndIncrement();
}
看一下DefaultFuture
的构造器:
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); // <invokeId, 异步调用计算结果>
private final long id; // invoke id
private final Request request;
private volatile Response response;
public DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId(); // invoke id
this.timeout = timeout > 0 ? timeout :
// channel.getUrl()--合并消费者参数之后的提供者url
channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
再来看一下响应。
HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req)
:
private Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion()); // 构造response
// ...
// find handler by message class.
Object msg = req.getData(); // RpcInvocation
try {
// handle data.
Object result = handler.reply(channel, msg); // 返回值RpcResult
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
来看一下Response
的构造器:
private long mId = 0;
private String mVersion;
public Response(long id, String version) {
mId = id;
mVersion = version;
}
这里response
的id
的值是request
的id
。最后来看一下服务端接收后的处理:
DefaultFuture.received(Channel channel, Response response)
:
// 处理响应
// @param channel NettyChannel
// @param response Response
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId()); // 根据请求响应的id,找到对应的DefaultFuture,并从FUTURES中删除该DefaultFuture
if (future != null) {
future.doReceived(response); // 异步转同步
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
至此,客户端接收响应分析完毕。