Dubbo 服务端响应编码

Encode Response

Posted by Jay on April 18, 2019

Dubbo 服务端响应编码

服务端响应编码总体流程:

NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
  -->ExchangeCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
      -->encodeResponse(Channel channel, ChannelBuffer buffer, Response res)
        -->getSerialization(Channel channel)   // 获取Hessian2Serialization序列化方式实例
          -->CodecSupport.getSerialization(URL url)
            -->ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter("serialization", "hessian2"))
        <!-- 构造一个16字节的byte[16] header -->
        -->byte[] header = new byte[16]
        -->Bytes.short2bytes(MAGIC, header)  // 设置前两个字节为魔数0xdabb
        <!-- 第三个字节序列化方式ID如果响应是心跳添加eventFlag -->
        -->header[2] = serialization.getContentTypeId();
         if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
      <!-- 第四个字节响应状态 -->
        -->header[3] = res.getStatus();
      <!-- 设置第5~12个字节long是64bit即8byte):respID == requestID -->
      -->Bytes.long2bytes(res.getId(), header, 4);
      <!-- 下面序列化响应体数据 -->
      -->new Hessian2ObjectOutput(out)
      -->DubboCodec.encodeResponseData(Channel channel, ObjectOutput out, Object data)
      -->Bytes.int2bytes(len, header, 12); // 设置第13~16个字节(int是32位,4个字节):响应消息体长度
      -->buffer.writeBytes(header); // 将header写入buffer的前16字节

服务端响应编码过程与客户端请求编码过程很相似。

// ExchangeCodec.encodeResponse()  响应编码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
// @param res Response
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
    int savedWriteIndex = buffer.writerIndex();
    try {
        Serialization serialization = getSerialization(channel); // 序列化实例
        // header.
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.
        Bytes.short2bytes(MAGIC, header);
        // set request and serialization flag.
        header[2] = serialization.getContentTypeId();
        if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
        // set response status.
        byte status = res.getStatus();
        header[3] = status;
        // set request id.
        Bytes.long2bytes(res.getId(), header, 4);

        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // buffer先写入响应体
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        // out--Hessian2ObjectOutput对象
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        // encode response data or error message.
        if (status == Response.OK) { // 服务端响应ok(包含服务端接口方法执行时的异常)
            if (res.isHeartbeat()) { // 心跳
                encodeHeartbeatData(channel, out, res.getResult());
            } else {
                // 正常响应
                encodeResponseData(channel, out, res.getResult());
            }
        } else out.writeUTF(res.getErrorMessage()); // 服务端响应status不是OK,直接写出错误内容
        out.flushBuffer();
        bos.flush();
        bos.close();

        int len = bos.writtenBytes(); // 响应体数据长度
        checkPayload(channel, len);
        Bytes.int2bytes(len, header, 12); // header写入响应体长度
        // write
        buffer.writerIndex(savedWriteIndex);
        buffer.writeBytes(header); // write header. buffer写入header
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    } catch (Throwable t) {
        // 将已写buffer内容清空
        buffer.writerIndex(savedWriteIndex);
        // 发送失败信息给Consumer,否则Consumer只能等超时了
        if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
            Response r = new Response(res.getId(), res.getVersion());
            r.setStatus(Response.BAD_RESPONSE);

            if (t instanceof ExceedPayloadLimitException) {
                logger.warn(t.getMessage(), t);
                try {
                    r.setErrorMessage(t.getMessage());
                    channel.send(r);
                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
                }
            } else {
                // FIXME 在Codec中打印出错日志?在IoHanndler的caught中统一处理?
                logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
                try {
                    r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                    channel.send(r);
                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
                }
            }
        }

        // 重新抛出收到的异常
        if (t instanceof IOException) {
            throw (IOException) t;
        } else if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else if (t instanceof Error) {
            throw (Error) t;
        } else {
            throw new RuntimeException(t.getMessage(), t);
        }
    }
}

DubboCodec.encodeResponseData:

// DubboCodec.encodeResponseData() 序列化response body数据
// @param channel NettyChannel
// @param out Hessian2ObjectOutput对象
// @param data RpcResult
@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
    Result result = (Result) data;

    Throwable th = result.getException();
    if (th == null) { // 无异常
        Object ret = result.getValue();
        if (ret == null) {
            out.writeByte(RESPONSE_NULL_VALUE); // 返回值为null
        } else {
            out.writeByte(RESPONSE_VALUE); // 返回值是正常值,先写RESPONSE_VALUE标志
            out.writeObject(ret); // 再写出返回值
        }
    } else {
        // 存在异常
        out.writeByte(RESPONSE_WITH_EXCEPTION); // 先写出异常标志
        out.writeObject(th); // 再写异常数据
    }
}

注意:out.writeByte(RESPONSE_VALUE);写入这个响应类型,是为了后面客户端解码响应Response的时候用的。

客户端请求编码与服务端响应编码的对比

  • 客户端请求编码的byte[] header的最终结构:
    • 第1~2 byte:魔数
    • 第3 byte:requestFlag、序列化方式ID、twowayFlag或eventFlag
    • 第5~12 byte :requestID
    • 第13~16 byte:请求体长度
  • 服务端响应编码的byte[] header的最终结构:

    • 第1~2 byte:魔数
    • 第3 byte:序列化方式ID、eventFlag(如果响应信息是心跳信息,添加eventFlag)
    • 第4 byte:响应状态,20代表成功OK
    • 第5~12 byte :reponseID(实际上==requestID)
    • 第13~16 byte:响应体长度