Dubbo 客户端响应解码

Decode Response

Posted by Jay on April 18, 2019

Dubbo 客户端响应解码

客户端响应解码整体流程:

NettyCodecAdapter$InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
  -->ExchangeCodec.decode(Channel channel, ChannelBuffer buffer)
    -->buffer.readBytes(header); // 读取header byte[]
    -->decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
      -->检查魔数总共可读的字节数是否大于等于16
      -->获取响应消息体长度
      -->new ChannelBufferInputStream(buffer, len)
      -->DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
        -->CodecSupport.getSerialization(URL url, Byte id) // 解析出响应头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
        -->Bytes.bytes2long(header, 4) // 获取respID
        <!-- 之后创建一个新的Response对象将respID及后续解析出来的各种resp属性塞入该对象中 -->
        -->new DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
          -->DecodeableRpcResult.decode()
            -->decode(Channel channel, InputStream input) // 解析响应体参数并将其构造为一个DecodeableRpcResult,最终塞到Response对象的mResult属性中
              -->ObjectInput in = new Hessian2ObjectInput(InputStream is)
              -->反序列化in.readObject()

客户端响应解码与服务端请求解码类似,下面只介绍不同点。

// DubboCodec.decodeBody   解码响应body
// @param channel NettyChannel
// @param is ChannelBufferInputStream
// @param header 头部数据
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // 获取序列化id
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 获取序列话方式
    // get request id.
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) {// 客户端收到的响应解码
        // decode response.
        Response res = new Response(id);
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(Response.HEARTBEAT_EVENT); // 心跳事件
        }
        // get status.
        byte status = header[3];
        res.setStatus(status); // 设置响应状态
        if (status == Response.OK) { // 服务端响应OK(包含服务端服务接口方法调用时的异常)
            try {
                Object data;
                if (res.isHeartbeat()) {
                    // 心跳
                    // deserialize(s, channel.getUrl(), is)--Hessian2ObjectInput(is)
                    data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
                } else if (res.isEvent()) {
                    // 事件
                    data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
                } else {
                    // 正常业务响应
                    DecodeableRpcResult result;
                    if (channel.getUrl().getParameter(
                            Constants.DECODE_IN_IO_THREAD_KEY,
                            Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                        result = new DecodeableRpcResult(channel, res, is,
                                (Invocation) getRequestData(id), proto);
                        result.decode();
                    } else {
                        result = new DecodeableRpcResult(channel, res,
                                new UnsafeByteArrayInputStream(readMessageData(is)),
                                (Invocation) getRequestData(id), proto);
                    }
                    data = result;
                }
                res.setResult(data); // 设置Result
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode response failed: " + t.getMessage(), t);
                }
                res.setStatus(Response.CLIENT_ERROR); // 客户端解码出现异常
                res.setErrorMessage(StringUtils.toString(t));
            }
        } else {
            // 服务端响应status!=OK,直接读取错误消息
            res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
        }
        return res;
    } else {
        ...
    }
}

DecodeableRpcResult.decode:

public void decode() throws Exception {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc result failed: " + e.getMessage(), e);
            }
            // 客户端解码出现错误
            response.setStatus(Response.CLIENT_ERROR);
            response.setErrorMessage(StringUtils.toString(e));
        } finally {
            hasDecoded = true;
        }
    }
}
// @param channel NettyChannel
// @param input   ChannelBufferInputStream
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input); // Hessian2ObjectInput(input)对象

    byte flag = in.readByte(); // 返回结果标志,这个值是服务端响应编码进来的,用来表示响应的返回类型
    switch (flag) {
        // 返回值为null,不作处理
        case DubboCodec.RESPONSE_NULL_VALUE:
            break;
        // 存在正常返回值
        case DubboCodec.RESPONSE_VALUE:
            try {
                Type[] returnType = RpcUtils.getReturnTypes(invocation); // 获取返回值类型
              	// 读取并设置result
              	// TODO: 返回值类型为void,为什么还要读取in.readObject()??
                setValue(returnType == null || returnType.length == 0 ? in.readObject() :
                        (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
                                : in.readObject((Class<?>) returnType[0], returnType[1])));
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        // 服务端接口调用时抛出了异常
        case DubboCodec.RESPONSE_WITH_EXCEPTION:
            try {
                Object obj = in.readObject(); // 读取异常
                if (obj instanceof Throwable == false)
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                setException((Throwable) obj); // 设置服务端接口调用异常exception
            } catch (ClassNotFoundException e) {
                throw new IOException(StringUtils.toString("Read response data failed.", e));
            }
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
    }
    return this;
}

setValue:设置DecodeableRpcResult的Object result属性。

setException:设置DecodeableRpcResult的Throwable exception属性。

至此,客户端解码响应Response流程结束。