Dubbo 客户端接收响应流程分析(异步转同步实现)

客户端接收响应

Posted by Jay on March 11, 2019

Dubbo 客户端接收响应流程分析(异步转同步实现)

一、总体流程

// 客户端接收响应
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->NettyClient.received(NettyChannel, Response)
  -->AbstractPeer.received(NettyChannel, Response)
    -->MultiMessageHandler.received(NettyChannel, Response)
      -->HeartbeatHandler.received(NettyChannel, Response)
        -->AllChannelHandler.received(NettyChannel, Response)
          -->ExecutorService cexecutor = getExecutorService() // 消费者CachedThreadPool
          -->executor.execute(new ChannelEventRunnable(NettyChannel, DecodeHandler, ChannelState.RECEIVED, Response))
            -->ChannelEventRunnable.run()
              -->DecodeHandler.received(NettyChannel,Response)
                -->decode(RpcResult)
                -->HeaderExchangeHandler.received(NettyChannel, Response)
                  -->handleResponse(NettyChannel, Response)
                    -->DefaultFuture.received(NettyChannel, Response) 
                      -->doReceived(Response) // 异步转同步

二、源码分析

HeaderExchangeHandler.received(Channel channel, Object message)方法之前,客户端接收响应Response与服务端接收请求Request一样,不再分析。

HeaderExchangeHandler.received(Channel channel, Object message):

// 收到Response
// @param channel NettyChannel.
// @param message Response
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 设置读时间戳
    // 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // ...
        } else if (message instanceof Response) {
            // 处理响应--消费者端
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

 private static void handleResponse(Channel channel, Response response) throws RemotingException {
     if (response != null && !response.isHeartbeat()) { // 非心跳响应
         DefaultFuture.received(channel, response);
     }
 }

DefaultFuture.received(Channel channel, Response response:

private final long id; // invoke id
private final Request request;
private final int timeout;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private volatile Response response; // 响应
private volatile ResponseCallback callback;

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); // <invokeId, 异步调用计算结果>
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); // <invokeId, Dubbo数据通道>

// 处理响应
// @param channel NettyChannel
// @param response Response
public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId()); // 根据请求响应的id,找到对应的DefaultFuture
        if (future != null) {
            future.doReceived(response); // 异步转同步
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

// 异步转同步
private void doReceived(Response res) {
    lock.lock();
    try {
        response = res; // 设置response
        if (done != null) {
            done.signal(); // 唤醒阻塞的线程
        }
    } finally {
        lock.unlock();
    }
    if (callback != null) {
        // 触发回调
        invokeCallback(callback);
    }
}

这里比较难懂,再给出客户端发出请求时的一段代码:HeaderExchangeChannel.request(Object request, int timeout):

// 客户端发送请求,request RpcInvocation对象,timeout 超时时间
public ResponseFuture request(Object request, int timeout) throws RemotingException {
    if (closed) {
        throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request
                + ", cause: The channel " + this + " is closed!");
    }
    // create request.
    Request req = new Request();
    req.setVersion("2.0.0"); // Dubbo 协议版本
    req.setTwoWay(true); // 是否双向,有返回值
    req.setData(request); // 设置Data为Invocation
    // <NettyClient, Request, timeout>
    DefaultFuture future = new DefaultFuture(channel, req, timeout); 
    try {
        channel.send(req);
    } catch (RemotingException e) {
        future.cancel();
        throw e;
    }
    return future;
}

netty是一个异步非阻塞的框架,所以当执行channel.send(req);的时候,当其内部执行到netty发送请求消息时,不会等待结果,直接返回。为了实现“异步转为同步”,使用了DefaultFuture这个辅助类。

HeaderExchangeChannel.request(Object request, int timeout)中,在还没有等到客户端的响应回来的时候,就直接将future返回了。返回给谁?再来看HeaderExchangeChannel.request(Object request, int timeout)的调用者。

-->DubboInvoker.doInvoke(final Invocation invocation)
  //获取ExchangeClient进行消息的发送
  -->ReferenceCountExchangeClient.request(Object request, int timeout)request--RpcInvocation
	-->HeaderExchangeClient.request(Object request, int timeout)
	  -->HeaderExchangeChannel.request(Object request, int timeout)

DubboInvoker.doInvoke(final Invocation invocation):

// 实际调用过程
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation); // 方法名
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); // 服务路径,即业务接口 path
    inv.setAttachment(Constants.VERSION_KEY, version); // 服务版本 version

    // 确定客户端
    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length]; // index递增
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否异步
        boolean isOneway = RpcUtils.isOneway (getUrl(), invocation); // 是否单向,不需要返回值
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // RPC调用默认超时1s
        if (isOneway) { // 无返回值
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent); // isSent是否等待请求发送完毕
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) { // 异步有返回值
            ResponseFuture future = currentClient.request(inv, timeout); // DefaultFuture
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else { // 同步有返回值
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get(); // 阻塞等待
        }
    } catch (TimeoutException e) { // 超时异常
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) { // 网络异常
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: "
                + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

其中currentClient.request(inv, timeout)返回值是ResponseFutureDefaultFutureResponseFuture的实现类,实际上这里返回的就是DefaultFuture实例,而该实例就是HeaderExchangeChannel.request(Object request, int timeout)返回的那个future实例。之后调用DefaultFuture.get()

public Object get() throws RemotingException {
    return get(timeout);
}

@Override
public Object get(int timeout) throws RemotingException {
    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }

    if (!isDone()) { // 未得到响应结果
        long start = System.currentTimeMillis();
        lock.lock();
        try {
            while (!isDone()) {
                done.await(timeout, TimeUnit.MILLISECONDS);
                if (isDone() || System.currentTimeMillis() - start > timeout) {
                    break; // 已完成或超时,退出
                }
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
        if (!isDone()) {
            // sent > 0,说明客户端已经发送出去请求,是服务端的问题
            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }
    return returnFromResponse();
}

此处可知当响应response没有回来时,condition会执行await进行阻塞当前线程,直到被唤醒或被中断或阻塞时间耗尽。当客户端接收到服务端的响应的时候,DefaultFuture.doReceived方法会先为response赋上返回值,之后执行conditionsignal唤醒被阻塞的线程,get()方法就会继续执行,执行returnFromResponse()并返回结果。

// 获取response结果,返回
private Object returnFromResponse() throws RemotingException {
    Response res = response;
    if (res == null) {
        throw new IllegalStateException("response cannot be null");
    }
    if (res.getStatus() == Response.OK) {
        return res.getResult(); // 获取结果
    }
    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    }
    throw new RemotingException(channel, res.getErrorMessage());
}

到现在其实还有一个问题,就是netty是异步非阻塞的,那么假设现在发了1w个Request,后来返回来1w个Response,那么怎么对应RequestResponse呢?如果对应不上,最起码的唤醒就会有问题。为了解决这个问题提,RequestResponse中都有一个属性id。

HeaderExchangeChannel.request(Object request, int timeout)中:

 // create request.
Request req = new Request();
req.setVersion("2.0.0"); // Dubbo 协议版本
req.setTwoWay(true); // 是否双向,有返回值
req.setData(request); // 设置Data为Invocation
// <NettyClient, Request, timeout>
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
    channel.send(req);
} catch (RemotingException e) {
    future.cancel();
    throw e;
}
return future;

看一下Request的构造器:

private static final AtomicLong INVOKE_ID = new AtomicLong(0);
private final long mId; // invoke id,m表示消息message

public Request() {
    mId = newId();
}

private static long newId() {
    // getAndIncrement()增长到MAX_VALUE时,再增长会变为MIN_VALUE,负数也可以做为ID
    return INVOKE_ID.getAndIncrement();
}

看一下DefaultFuture的构造器:

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); // <invokeId, 异步调用计算结果>
private final long id; // invoke id
private final Request request;
private volatile Response response;

public DefaultFuture(Channel channel, Request request, int timeout) {
    this.channel = channel;
    this.request = request;
    this.id = request.getId(); // invoke id
    this.timeout = timeout > 0 ? timeout :
            // channel.getUrl()--合并消费者参数之后的提供者url
            channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);

    FUTURES.put(id, this);
    CHANNELS.put(id, channel);
}

再来看一下响应。

HeaderExchangeHandler.handleRequest(ExchangeChannel channel, Request req):

private Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion()); // 构造response
    // ...
    // find handler by message class.
    Object msg = req.getData(); // RpcInvocation
    try {
        // handle data.
        Object result = handler.reply(channel, msg); // 返回值RpcResult
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

来看一下Response的构造器:

private long mId = 0;
private String mVersion;

public Response(long id, String version) {
    mId = id;
    mVersion = version;
}

这里responseid的值是requestid。最后来看一下服务端接收后的处理:

DefaultFuture.received(Channel channel, Response response):

// 处理响应
// @param channel NettyChannel
// @param response Response
public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId()); // 根据请求响应的id,找到对应的DefaultFuture,并从FUTURES中删除该DefaultFuture
        if (future != null) {
            future.doReceived(response); // 异步转同步
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

至此,客户端接收响应分析完毕。