Dubbo 客户端请求编码

Encode Request

Posted by Jay on April 15, 2019

Dubbo 客户端请求编码

以Dubbo使用Netty4为通信框架来进行分析。

客户端请求编码总体流程如下:

NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext ctx, Channel ch, Object msg)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
  -->ExchangeCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
      -->encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
        -->getSerialization(Channel channel)   // 获取Hessian2Serialization序列化实例
          -->CodecSupport.getSerialization(URL url)
            -->ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter("serialization", "hessian2"))
        <!-- 构造一个16字节的byte[16] header -->
        -->byte[] header = new byte[16]
        -->Bytes.short2bytes(MAGIC, header)  //设置前两个字节为魔数0xdabb
        <!-- 第三个字节表示消息是req序列化方式IDtwoway/event -->
        -->header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
         if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
         if (req.isEvent()) header[2] |= FLAG_EVENT;
      <!-- 设置第5~12个字节long是64bit即8byte):requestID -->
      -->Bytes.long2bytes(req.getId(), header, 4);
      <!-- 下面序列化请求体数据 -->
      -->new Hessian2ObjectOutput(out)
      -->DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data)
      -->Bytes.int2bytes(len, header, 12); // 设置第13~16个字节(int是32位,4个字节):消息体长度
      -->buffer.writeBytes(header); // 将header写入buffer的前16个字节

总体流程很简单:

  • 创建一个buffer;
  • 创建一个16位的byte[16] header,将魔数、请求标志、序列化方式ID、twoway/event标志、requestID、请求体长度写入header;
  • 之后序列化请求体,从buffer的第17个字节向后写入序列化后的请求体字节数组;
  • 最后,将header中的内容写入buffer的前16个字节;
  • 最后发送buffer。

首先来看一下Netty编解码的入口com.alibaba.dubbo.remoting.transport.netty4.NettyServer:

@Override
protected void doOpen() throws Throwable {
    // 设置logger factory
    NettyHelper.setNettyLoggerFactory();

    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    // Netty服务端处理器
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            // 设置TCP底层相关的属性
            // 是否开启Nagle算法,true表示关闭,要求数据的高实时性
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            // 是否允许重复使用本地地址和端口,true表示允许
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            // 使用对象池,重用缓冲区
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // 构造器参数: <DubboCountCodec,提供者url,ChannelHandler(NettyServer实例)>
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            .addLast("decoder", adapter.getDecoder())  // 解码器
                            .addLast("encoder", adapter.getEncoder())  // 编码器
                            .addLast("handler", nettyServerHandler);   // 服务端逻辑处理器,处理入站、出站事件
                }
            });
    // bind [id: 0x7b324585, /0.0.0.0:20881] 监听在任意ip地址
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    // 同步等待bind完成
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();

}

NettyCodecAdapter:

final class NettyCodecAdapter {

    // 消息编码器
    private final ChannelHandler encoder = new InternalEncoder();

    // 消息解码器
    private final ChannelHandler decoder = new InternalDecoder();

    // DubboCountCodec实例
    private final Codec2         codec;
    
    private final URL            url;

    // 数据通道处理器
    private final com.alibaba.dubbo.remoting.ChannelHandler handler;

    // 初始化
    // @param codec  DubboCountCodec实例
    // @param url 提供者url(消费者端是合并消费者参数之后的提供者url)
    // @param handler NettyServer/NettyClient
    NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
        this.codec = codec;
        this.url = url;
        this.handler = handler;
    }

    ChannelHandler getEncoder() {
        return encoder;
    }

    ChannelHandler getDecoder() {
        return decoder;
    }

    // 编码器
    private class InternalEncoder extends MessageToByteEncoder {

        @Override
        protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            // 包装Netty的ByteBuf
            com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
            Channel ch = ctx.channel();
            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
            try {
                // 编码
                codec.encode(channel, buffer, msg);
            } finally {
                NettyChannel.removeChannelIfDisconnected(ch);
            }
        }
    }

    // 解码器
    private class InternalDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
            // 包装Netty的ByteBuf
            ChannelBuffer message = new NettyBackedChannelBuffer(input);
            // 获取NettyChannel
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            Object msg;

            int saveReaderIndex;

            try {
                // decode object.
                do {
                    saveReaderIndex = message.readerIndex();
                    // 解码message
                    msg = codec.decode(channel, message);
                    // 解码时发现需要更多的字节数据
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                        // 则重置读指针
                        message.readerIndex(saveReaderIndex);
                        break;
                    } else {
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) {
                            throw new IOException("Decode without read data.");
                        }
                        // 如果读到了正常的消息,写入List<Object> out
                        if (msg != null) {
                            out.add(msg);
                        }
                    }
                } while (message.readable());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
    }
}

一、创建ChannelBuffer

com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);

这里的out是:

ByteBuf buffer = PooledUnsafeDirectByteBuf

NettyBackedChannelBuffer:

// 字节缓冲区
private ByteBuf buffer;

public NettyBackedChannelBuffer(ByteBuf buffer) {
    Assert.notNull(buffer, "buffer == null");
    this.buffer = buffer;
}

最终的buffer:

NettyBackedChannelBuffer
	-->ByteBuf buffer = PooledUnsafeDirectByteBuf

二、获取NettyChannel

之后先获取io.netty.channel实例,然后包装在NettyChannel中。

NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
// Netty数据通道到Dubbo Netty数据通道的映射表
private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();

// Netty数据通道
private final Channel channel;

private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();

private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
    super(url, handler);
    if (channel == null) {
        throw new IllegalArgumentException("netty channel == null;");
    }
    this.channel = channel;
}

// 根据Netty Channel获取缓存NettyChannel,没有则新建
// @param ch {@link Channel}  Netty Channel
// @param @param url 提供者url/合并消费者参数之后的提供者url
// @param handler NettyServer/NettyClient
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
    if (ch == null) {
        return null;
    }
    NettyChannel ret = channelMap.get(ch);
    if (ret == null) {
        NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
        if (ch.isActive()) {
            ret = channelMap.putIfAbsent(ch, nettyChannel);
        }
        if (ret == null) {
            ret = nettyChannel;
        }
    }
    return ret;
}

首先从缓存ConcurrentMap<Channel, NettyChannel> channelMap中获取key=io.netty.channel的NettyChannel,有则返回,没有则新建并返回。

最终获取到的NettyChannel实例如下:

1 -->Channel channel = NioSocketChannel
2 -->ChannelHandler handler = NettyClient
3 -->URL url = dubbo://192.168.0.100:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&codec=dubbo&default.client=netty4&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,sayBye&pid=34996&register.ip=192.168.0.100&remote.timestamp=1555341190543&side=consumer&timestamp=1555341214411

三、进行编码

codec.encode(channel, buffer, msg)

这里的codec是:

Codec2 codec = DubboCountCodec
	-->DubboCodec codec = new DubboCodec()

DubboCountCodec:

private DubboCodec codec = new DubboCodec();

// 编码
// @param channel NettyChannel对象
// @param buffer NettyBackedChannelBuffer对象
// @param msg Request/Response
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    codec.encode(channel, buffer, msg);
}

入参:

  • channel:上述的NettyChannel对象

  • buffer:上述的NettyBackedChannelBuffer对象

  • msg:Request对象,其属性如下:

    long mId = 0
    String mVersion = "2.0.0"
    boolean mTwoWay = true
    boolean mEvent = false
    boolean mBroken = false
    Object mData = RpcInvocation对象
    -->String methodName = "sayHello"
    -->Class<?>[] parameterTypes = [java.lang.String]
    -->Object[] arguments = ["world"]
    -->Map<String, String> attachments = {
         "path" -> "com.alibaba.dubbo.demo.DemoService"
         "interface" -> "com.alibaba.dubbo.demo.DemoService"
         "version" -> "0.0.0"
    }
    -->Invoker<?> invoker = DubboInvoker对象
    

之后调用DubboCodec.encode(Channel channel, ChannelBuffer buffer, Object msg),该方法位于其父类ExchangeCodec中。

// 请求/响应编码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
// @param msg Request/Response
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    if (msg instanceof Request) {
        encodeRequest(channel, buffer, (Request) msg);
    } else if (msg instanceof Response) {
        encodeResponse(channel, buffer, (Response) msg);
    } else {
        super.encode(channel, buffer, msg);
    }
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel); // 获取序列化方式
    // header.
    byte[] header = new byte[HEADER_LENGTH];
    // set magic number.
    Bytes.short2bytes(MAGIC, header);

    // set request and serialization flag.
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    if (req.isEvent()) header[2] |= FLAG_EVENT;

    // set request id.
    Bytes.long2bytes(req.getId(), header, 4); // long id ,8 字节

    // encode request data.
    int savedWriteIndex = buffer.writerIndex(); // 写索引
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // 设置写索引为0+16,先跳过header,先写请求体的字节
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // Hessian2ObjectOutput
    if (req.isEvent()) {
        // 事件
        encodeEventData(channel, out, req.getData());
    } else {
        // 请求
        encodeRequestData(channel, out, req.getData());
    }
    out.flushBuffer(); // Hessian2ObjectOutput写出到ChannelBufferOutputStream bos
    bos.flush();
    bos.close();
    int len = bos.writtenBytes(); // 请求体字节数
    checkPayload(channel, len); // 检查有效负载是否超过限制
    Bytes.int2bytes(len, header, 12); // len int 4字节

    // write 写入头部header
    buffer.writerIndex(savedWriteIndex); // 还原到savedWriteIndex
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); // 写指针跳到最后
}
1.首先利用SPI机制获取序列化方式
Serialization serialization = getSerialization(channel); // 获取序列化方式

getSerialization方法位于ExchangeCodec的父类AbstractCodec中。

protected Serialization getSerialization(Channel channel) {
    return CodecSupport.getSerialization(channel.getUrl());
}

public static Serialization getSerialization(URL url) {
  	return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
    	url.getParameter("serialization", "hessian2"));
}

最终获取到的Serialization serialization = Hessian2Serialization对象:

public class Hessian2Serialization implements Serialization {

    public static final byte ID = 2;

    public byte getContentTypeId() {
        return ID;
    }

    public String getContentType() {
        return "x-application/hessian2";
    }

    public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
        return new Hessian2ObjectOutput(out);
    }

    public ObjectInput deserialize(URL url, InputStream is) throws IOException {
        return new Hessian2ObjectInput(is);
    }

}

注意:hessian2序列化方式的id是2,该序列化方式ID会写在协议头里传给服务端,服务端根据序列化方式ID获取对应的序列化方式来反序列化请求体。

2.创建16字节header字节数组
byte[] header = new byte[16];

然后填充第1~2个字节为魔数;填充第3个字节为requestFlag、序列化方式ID(这里是2)、twowayFlag或eventFlag;填充第5~12个字节为requestID(long=64bit=8byte)

// set magic number.
Bytes.short2bytes(MAGIC, header);

// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;

// set request id.
Bytes.long2bytes(req.getId(), header, 4); // long id ,8 字节
3.序列化请求体

首先设置buffer的writerIndex:

int savedWriteIndex = buffer.writerIndex(); // 写索引
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // 设置写索引为0+16,先跳过header,先写请求体的字节

首先存储了buffer当前的writeIndex(写索引),从该位置开始到“该位置+15”这一段会写入header字节数组(例如,[0,15]),从“该位置+16”开始向后写入请求体字节数组(例如,[16, x))。

然后就是设置buffer的writerIndex为当前位置+16,因为接下来要先序列化请求体,然后将请求体写入buffer,最后才会将header写入buffer。

序列化请求体:

ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // Hessian2ObjectOutput
if (req.isEvent()) {
    // 事件
    encodeEventData(channel, out, req.getData());
} else {
    // 请求
    encodeRequestData(channel, out, req.getData());
}
out.flushBuffer(); // Hessian2ObjectOutput写出到ChannelBufferOutputStream bos
bos.flush();
bos.close();

首先新建一个ChannelBufferOutputStream对象(该对象继承自java.io.OutputStream抽象类):

private final ChannelBuffer buffer;
private final int startIndex;

// @param buffer NettyBackedChannelBuffer
public ChannelBufferOutputStream(ChannelBuffer buffer) {
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }
    this.buffer = buffer;
    startIndex = buffer.writerIndex(); // 写索引
}

buffer为上述的NettyBackedChannelBuffer对象;startIndex == 16

然后获取ObjectOutput对象:

// out--ChannelBufferOutputStream对象
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
    return new Hessian2ObjectOutput(out);
}
private final Hessian2Output mH2o;

// os--ChannelBufferOutputStream对象
public Hessian2ObjectOutput(OutputStream os) {
    mH2o = new Hessian2Output(os);
    mH2o.setSerializerFactory(Hessian2SerializerFactory.SERIALIZER_FACTORY);
}
public final static int SIZE = 4096;
private final byte[] _buffer = new byte[SIZE];
// the output stream/
protected OutputStream _os;

// Creates a new Hessian output stream, initialized with an
// underlying output stream.
// @param os the underlying output stream.
public Hessian2Output(OutputStream os) {
    _os = os;
}

最终得到的ObjectOutput对象:

Hessian2ObjectOutput
-->Hessian2Output mH2o
   -->byte[] _buffer = new byte[4096]
   -->OutputStream _os = 上述的ChannelBufferOutputStream对象
   -->SerializerFactory _serializerFactory = Hessian2SerializerFactory实例

最后执行DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data),该方法是真正进行请求体序列化的地方。

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;

    out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
    out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
    out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));

    out.writeUTF(inv.getMethodName());
    out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
    Object[] args = inv.getArguments();
    if (args != null)
        for (int i = 0; i < args.length; i++) {
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
    out.writeObject(inv.getAttachments());
}

其中,channel是上述的NettyChannel实例;out是上述的Hessian2ObjectOutput实例;data是Request对象中的data属性(RpcInvocation对象)

Object mData = RpcInvocation对象
-->String methodName = "sayHello"
-->Class<?>[] parameterTypes = [java.lang.String]
-->Object[] arguments = ["world"]
-->Map<String, String> attachments = {
     "path" -> "com.alibaba.dubbo.demo.DemoService"
     "interface" -> "com.alibaba.dubbo.demo.DemoService"
     "version" -> "0.0.0"
}
-->Invoker<?> invoker = DubboInvoker对象

从DubboCodec.encodeRequestData方法中,可以看到只会序列化Request请求体中的RpcInvocation对象的:

  • methodName:方法名
  • parameterTypes:参数类型
  • arguments:参数值
  • attachments:附加参数

其中附加参数中的”dubbo”、”path”、”version”还会单独使用out.writeUTF进行序列化。

首先来看一下:

Hessian2ObjectOutput.writeUTF(String v)
	-->Hessian2Output.writeString(String value)
		-->printString(String v, int strOffset, int length) 

通过printString这个方法,将传入的v存储在Hessian2Output对象的byte[] _buffer = new byte[4096]数组中。

Hessian2Output
    // Writes any object to the output stream.
    public void writeObject(Object object) throws IOException {
        if (object == null) {
            writeNull();
            return;
        }
				// 获取序列化器
        Serializer serializer = findSerializerFactory().getSerializer(object.getClass());
        serializer.writeObject(object, this);
    }

    public final SerializerFactory findSerializerFactory() {
        SerializerFactory factory = _serializerFactory;
        if (factory == null)
            _serializerFactory = factory = new SerializerFactory();
        return factory;
    }

    SerializerFactory
    private static HashMap _staticSerializerMap;
    private HashMap _cachedSerializerMap;
    // Returns the serializer for a class.
    // @param cl the class of the object that needs to be serialized.
    // @return a serializer object for the serialization.
    public Serializer getSerializer(Class cl)
            throws HessianProtocolException {
        Serializer serializer;

        serializer = (Serializer) _staticSerializerMap.get(cl);
        if (serializer != null)
            return serializer;

        if (_cachedSerializerMap != null) {
            synchronized (_cachedSerializerMap) {
                serializer = (Serializer) _cachedSerializerMap.get(cl);
            }

            if (serializer != null)
                return serializer;
        }
        ......
        if (serializer != null) {

        } 
        .......
        else if (Map.class.isAssignableFrom(cl)) {
            if (_mapSerializer == null)
                _mapSerializer = new MapSerializer();

            serializer = _mapSerializer;
        } 
        ......
        if (serializer == null)
            serializer = getDefaultSerializer(cl);

        if (_cachedSerializerMap == null)
            _cachedSerializerMap = new HashMap(8);

        synchronized (_cachedSerializerMap) {
            _cachedSerializerMap.put(cl, serializer);
        }

        return serializer;
    }

Hessian2Output.writeObject(Object object):

首先获取_serializerFactory工厂,这里是Hessian2SerializerFactory实例。其getSerializer(Class cl)方法位于其父类SerializerFactory中:获取序列化器的逻辑是:首先从_staticSerializerMap中获取相关类型的序列化器(_staticSerializerMap中启动时就缓存好一堆类型的序列化器:具体见com.alibaba.com.caucho.hessian.io.SerializerFactory),如果有返回,否则从_cachedSerializerMap缓存中获取相关的类加载器,如果没有,根据类型先创建序列化器(例如new MapSerializer(),当然还有getDefaultSerializer(cl)来兜底),最后放入缓存_cachedSerializerMap中。最后返回创建好的类加载器。

最后调用MapSerializer.writeObject(Object obj, AbstractHessianOutput out)进行序列化。

DubboCodec.encodeRequestData执行完毕之后,已将所有的信息写入了Hessian2Output对象的byte[] _buffer = new byte[4096]数组中。

注意

如果在将数据写入到_buffer的过程中,字节量超出了4096,会先执行Hessian2Output.flushBuffer()将_buffer中的数据拷贝到PooledUnsafeDirectByteBuf中,之后再往_buffer中写入字节。

最后执行Hessian2ObjectOutput.flushBuffer()

// Hessian2ObjectOutput
public void flushBuffer() throws IOException {
    mH2o.flushBuffer();
}

// Hessian2Output
 public final void flushBuffer() throws IOException {
   int offset = _offset;

   if (!_isStreaming && offset > 0) {
     _offset = 0;

     _os.write(_buffer, 0, offset);
   } else if (_isStreaming && offset > 3) {
     int len = offset - 3;
     _buffer[0] = 'p';
     _buffer[1] = (byte) (len >> 8);
     _buffer[2] = (byte) len;
     _offset = 3;

     _os.write(_buffer, 0, offset);
   }
 }

此处执行ChannelBufferOutputStream.write(byte[] b, int off, int len)

public void write(byte[] b, int off, int len) throws IOException {
    if (len == 0) {
        return;
    }

    buffer.writeBytes(b, off, len);
}
  Transfers the specified source array's data to this buffer starting at
  the current {@code writerIndex} and increases the {@code writerIndex} by
  the number of the transferred bytes (= {@code length}).
 
  @param index  the first index of the source
  @param length the number of bytes to transfer
  @throws IndexOutOfBoundsException if the specified {@code srcIndex} is
                                    less than {@code 0}, if {@code srcIndex
                                    + length} is greater than {@code
                                    src.length}, or if {@code length} is
                                    greater than {@code this.writableBytes}
void writeBytes(byte[] src, int index, int length);

就是将Hessian2Output对象的byte[] _buffer = new byte[4096]数组中的数据转移到buffer中。

NettyBackedChannelBuffer
	-->ByteBuf buffer = PooledUnsafeDirectByteBuf
4.将header写入buffer
int len = bos.writtenBytes(); // 计算请求体字节数
checkPayload(channel, len); // 检查有效负载是否超过限制
Bytes.int2bytes(len, header, 12); // 将请求体长度写入header的第13~16个字节(int=4byte)

// write 写入头部header
buffer.writerIndex(savedWriteIndex); // 设置buffer的writerIndex为该次写入的开始位置
buffer.writeBytes(header); // 将header数组写入buffer
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); // 写指针跳到最后

到此为止,整个编码就结束了。之后存储了header、body数据的ByteBuf由netty自己来进行网络传输。

来看一下请求编码的byte[] header的最终结构:

  • 1~2 byte:魔数
  • 3 byte:requestFlag、序列化方式ID、twowayFlag或eventFlag
  • 5~12 byte :requestID
  • 13~16:请求体长度

这里有一个小插曲:

protected static void checkPayload(Channel channel, long size) throws IOException {
    int payload = Constants.DEFAULT_PAYLOAD;
    if (channel != null && channel.getUrl() != null) {
        payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
    }
    if (payload > 0 && size > payload) {
        ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
        logger.error(e);
        throw e;
    }
}

Dubbo限制了如果传输的请求体长度大于8M,将会直接抛出异常。