Dubbo 服务端请求解码

Decode Request

Posted by Jay on April 16, 2019

Dubbo 服务端请求解码

服务端请求解码总体流程

NettyCodecAdapter$InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
  -->ExchangeCodec.decode(Channel channel, ChannelBuffer buffer)
    -->buffer.readBytes(header); //读取header byte[]
    -->decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
      -->检查魔数总共可读的字节数是否大于等于16
      -->获取请求体长度
      -->new ChannelBufferInputStream(buffer, len)
      -->DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
        -->CodecSupport.getSerialization(URL url, Byte id) // 解析出请求头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
        -->Bytes.bytes2long(header, 4) // 获取request id
        <!-- 之后创建一个新的Request对象将request id及后续解析出来的各种request属性塞入该对象中 -->
        -->new DecodeableRpcInvocation(channel, req, is, proto)
          -->DecodeableRpcInvocation.decode()
            -->decode(Channel channel, InputStream input) // 解析请求体参数并将其构造为一个DecodeableRpcInvocation,最终塞到Request对象的data属性中
              -->new Hessian2ObjectInput(InputStream is)
      				-->Hessian2ObjectInput.readObject(Class<?>)/readUTF() 反序列化
      					-->Hessian2Input.readObject(Class cl)/readString()
      						-->ChannelBufferInputStream.read(byte[] b, int off, int len) 

总体流程

  • 包装请求传过来的ByteBuf为NettyBackedChannelBuffer(简称buffer)
  • 从buffer中读取header
  • 之后检查魔数、总共可读的字节数是否大于等于16
  • 获取请求体body长度
  • 解析出请求头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
  • 获取requestID
  • 创建Request对象,将requestID及后续解析出来的各种request属性塞入该对象中
  • 反序列化请求体body,并将其设在DecodeableRpcInvocation中,最后该对象设在Request对象的data属性中

解码是在NettyCodecAdapter中:

// 解码器
private class InternalDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
        // 包装Netty的ByteBuf
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        // 获取NettyChannel
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

        Object msg;

        int saveReaderIndex;

        try {
            // decode object.
            do {
                saveReaderIndex = message.readerIndex();
                // 解码message
                msg = codec.decode(channel, message);
                // 解码时发现需要更多的字节数据
                if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                    // 则重置读指针,等后续接收到更多数据时再解码
                    message.readerIndex(saveReaderIndex);
                    break;
                } else {
                    //is it possible to go here ?(没有读到任何数据)
                    if (saveReaderIndex == message.readerIndex()) {
                        throw new IOException("Decode without read data.");
                    }
                    // 如果读到了正常的消息,写入List<Object> out
                    if (msg != null) {
                        out.add(msg);
                    }
                }
            } while (message.readable());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
}

一、创建ChannelBuffer

ChannelBuffer message = new NettyBackedChannelBuffer(input);

与客户端请求编码类似,最终得到的message:

NettyBackedChannelBuffer
   -->ByteBuf buffer = PooledUnsafeDirectByteBuf

二、获取NettyChannel

之后先获取io.netty.Channel实例,然后包装在NettyChannel中。

NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

与客户端请求编码类似,最终得到的channel:

-->Channel channel = NioSocketChannel
-->ChannelHandler handler = NettyServer
-->URL url = dubbo://192.168.0.100:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20881&cellinvokemode=sharing&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,sayBye&pid=51897&side=provider&timestamp=1555424608881

三、进行解码

这里的codec是:

Codec2 codec = DubboCountCodec
	-->DubboCodec codec = new DubboCodec()

DubboCountCodec:

// 解码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex(); // 当前读索引
    MultiMessage result = MultiMessage.create();
    do {
        Object obj = codec.decode(channel, buffer);
      	// 如果当前buffer数据不足,则跳出循环,在下一次依旧从save处读取
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            // 重置读索引
            buffer.readerIndex(save);
            break;
        } else {
           // 如果消息正常,添加消息到MultiMessage的List messages中
            result.addMessage(obj);
            logMessageLength(obj, buffer.readerIndex() - save);
            save = buffer.readerIndex();
        }
    } while (true);
    if (result.isEmpty()) {
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}

MultiMessage:

private final List messages = new ArrayList();

private MultiMessage() {
}

public static MultiMessage create() {
    return new MultiMessage();
}

public void addMessage(Object msg) {
    messages.add(msg);
}

DubboCodec的父类ExchangeCodec:

// 解码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes(); // 获取buffer的可读字节数
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    buffer.readBytes(header); // 将buffer中的前16个字节读入header
    return decode(channel, buffer, readable, header); // 反序列化请求体body,构造成DecodeableRpcInvocation,塞入Request的data属性中
}

// 解码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
// @param readable 总共可读取的字节数
// @param header 头部
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number. 检查魔数
    // 魔数不匹配
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        return super.decode(channel, buffer, readable, header);
    }
    // check length.
    if (readable < HEADER_LENGTH) { // 总共可读的字节数小于16字节
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    int len = Bytes.bytes2int(header, 12); // 从header中获取body长度
    checkPayload(channel, len); // 检测body是否超8M

    int tt = len + HEADER_LENGTH;
    if (readable < tt) { // 如果当前可读的数据<header+body总长度(说明发生了拆包)
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        return decodeBody(channel, is, header); // 解码body
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

DubboCodec.decodeBody():

// 解码body
// @param channel NettyChannel
// @param is ChannelBufferInputStream
// @param header 头部数据
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // proto: 序列化方式id
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 根据序列化方式id,获取序列化方式实例
    // get request id. 获取request id
    long id = Bytes.bytes2long(header, 4);
    if ((flag & FLAG_REQUEST) == 0) { 
	      // 客户端收到的响应解码
        // decode response.
        ...
    } else {
        // 服务端收到的请求解码
        // decode request.
        Request req = new Request(id); // 构造Request
        req.setVersion("2.0.0");
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(Request.HEARTBEAT_EVENT);
        }
        try {
            Object data;
            if (req.isHeartbeat()) { // 心跳
                data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
            } else if (req.isEvent()) { // 事件
                data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
            } else {
                DecodeableRpcInvocation inv;
                if (channel.getUrl().getParameter(
                        Constants.DECODE_IN_IO_THREAD_KEY,
                        Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
                  	// 构造DecodeableRpcInvocation
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    // 解码请求body
                    inv.decode();
                } else {
                    inv = new DecodeableRpcInvocation(channel, req,
                            new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            req.setData(data);
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            req.setBroken(true);
            req.setData(t);
        }
        return req;
    }
}

就是构造Request参数,重点构造其中的data属性(实际上是一个DecodeableRpcInvocation实例)

DecodeableRpcInvocation:

public void decode() throws Exception {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream); // 解码
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
            }
            request.setBroken(true);
            request.setData(e);
        } finally {
            hasDecoded = true;
        }
    }
}
// 解码请求body
// @param channel NettyChannel
// @param input   ChannelBufferInputStream
public Object decode(Channel channel, InputStream input) throws IOException {
    // 创建Hessian2ObjectInput
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
            .deserialize(channel.getUrl(), input);
		// 下边的读取顺序与序列化时的顺序必须一模一样
    setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
    setAttachment(Constants.PATH_KEY, in.readUTF());
    setAttachment(Constants.VERSION_KEY, in.readUTF());

    setMethodName(in.readUTF());
    try {
        Object[] args;
        Class<?>[] pts;
        String desc = in.readUTF();
        if (desc.length() == 0) {
            pts = DubboCodec.EMPTY_CLASS_ARRAY;
            args = DubboCodec.EMPTY_OBJECT_ARRAY;
        } else {
            pts = ReflectUtils.desc2classArray(desc);
            args = new Object[pts.length];
            for (int i = 0; i < args.length; i++) {
                try {
                    args[i] = in.readObject(pts[i]);
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        setParameterTypes(pts);

        Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
        if (map != null && map.size() > 0) {
            Map<String, String> attachment = getAttachments();
            if (attachment == null) {
                attachment = new HashMap<String, String>();
            }
            attachment.putAll(map);
            setAttachments(attachment);
        }
        //decode argument ,may be callback
        for (int i = 0; i < args.length; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }

        setArguments(args);

    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    }
    return this;
}

上述的setXXX方法,实际上就是为当前的DecodeableRpcInvocation设置各种属性,in.readUTF()和in.readobject都是反序列化的方法,前者将byte[]反序列化为String,后者将byte[]反序列化为Object。

至此,服务端请求解码过程结束。