Dubbo 客户端请求编码
以Dubbo使用Netty4为通信框架来进行分析。
客户端请求编码总体流程如下:
NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext ctx, Channel ch, Object msg)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
-->ExchangeCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
-->encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
-->getSerialization(Channel channel) // 获取Hessian2Serialization序列化实例
-->CodecSupport.getSerialization(URL url)
-->ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter("serialization", "hessian2"))
<!-- 构造一个16字节的byte[16] header -->
-->byte[] header = new byte[16]
-->Bytes.short2bytes(MAGIC, header) //设置前两个字节为魔数0xdabb
<!-- 第三个字节:表示消息是req,序列化方式ID,twoway/event -->
-->header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
<!-- 设置第5~12个字节(long是64bit,即8byte):requestID -->
-->Bytes.long2bytes(req.getId(), header, 4);
<!-- 下面序列化请求体数据 -->
-->new Hessian2ObjectOutput(out)
-->DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data)
-->Bytes.int2bytes(len, header, 12); // 设置第13~16个字节(int是32位,4个字节):消息体长度
-->buffer.writeBytes(header); // 将header写入buffer的前16个字节
总体流程很简单:
- 创建一个buffer;
- 创建一个16位的byte[16] header,将魔数、请求标志、序列化方式ID、twoway/event标志、requestID、请求体长度写入header;
- 之后序列化请求体,从buffer的第17个字节向后写入序列化后的请求体字节数组;
- 最后,将header中的内容写入buffer的前16个字节;
- 最后发送buffer。
首先来看一下Netty编解码的入口com.alibaba.dubbo.remoting.transport.netty4.NettyServer:
@Override
protected void doOpen() throws Throwable {
// 设置logger factory
NettyHelper.setNettyLoggerFactory();
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
// Netty服务端处理器
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 设置TCP底层相关的属性
// 是否开启Nagle算法,true表示关闭,要求数据的高实时性
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
// 是否允许重复使用本地地址和端口,true表示允许
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
// 使用对象池,重用缓冲区
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 构造器参数: <DubboCountCodec,提供者url,ChannelHandler(NettyServer实例)>
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder()) // 解码器
.addLast("encoder", adapter.getEncoder()) // 编码器
.addLast("handler", nettyServerHandler); // 服务端逻辑处理器,处理入站、出站事件
}
});
// bind [id: 0x7b324585, /0.0.0.0:20881] 监听在任意ip地址
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
// 同步等待bind完成
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
NettyCodecAdapter:
final class NettyCodecAdapter {
// 消息编码器
private final ChannelHandler encoder = new InternalEncoder();
// 消息解码器
private final ChannelHandler decoder = new InternalDecoder();
// DubboCountCodec实例
private final Codec2 codec;
private final URL url;
// 数据通道处理器
private final com.alibaba.dubbo.remoting.ChannelHandler handler;
// 初始化
// @param codec DubboCountCodec实例
// @param url 提供者url(消费者端是合并消费者参数之后的提供者url)
// @param handler NettyServer/NettyClient
NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
}
ChannelHandler getEncoder() {
return encoder;
}
ChannelHandler getDecoder() {
return decoder;
}
// 编码器
private class InternalEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
// 包装Netty的ByteBuf
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
try {
// 编码
codec.encode(channel, buffer, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ch);
}
}
}
// 解码器
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
// 包装Netty的ByteBuf
ChannelBuffer message = new NettyBackedChannelBuffer(input);
// 获取NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
// 解码message
msg = codec.decode(channel, message);
// 解码时发现需要更多的字节数据
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
// 则重置读指针
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
// 如果读到了正常的消息,写入List<Object> out
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
}
一、创建ChannelBuffer
com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
这里的out是:
ByteBuf buffer = PooledUnsafeDirectByteBuf
NettyBackedChannelBuffer:
// 字节缓冲区
private ByteBuf buffer;
public NettyBackedChannelBuffer(ByteBuf buffer) {
Assert.notNull(buffer, "buffer == null");
this.buffer = buffer;
}
最终的buffer:
NettyBackedChannelBuffer
-->ByteBuf buffer = PooledUnsafeDirectByteBuf
二、获取NettyChannel
之后先获取io.netty.channel实例,然后包装在NettyChannel中。
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
// Netty数据通道到Dubbo Netty数据通道的映射表
private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();
// Netty数据通道
private final Channel channel;
private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
super(url, handler);
if (channel == null) {
throw new IllegalArgumentException("netty channel == null;");
}
this.channel = channel;
}
// 根据Netty Channel获取缓存NettyChannel,没有则新建
// @param ch {@link Channel} Netty Channel
// @param @param url 提供者url/合并消费者参数之后的提供者url
// @param handler NettyServer/NettyClient
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
if (ch == null) {
return null;
}
NettyChannel ret = channelMap.get(ch);
if (ret == null) {
NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
if (ch.isActive()) {
ret = channelMap.putIfAbsent(ch, nettyChannel);
}
if (ret == null) {
ret = nettyChannel;
}
}
return ret;
}
首先从缓存ConcurrentMap<Channel, NettyChannel> channelMap中获取key=io.netty.channel的NettyChannel,有则返回,没有则新建并返回。
最终获取到的NettyChannel实例如下:
1 -->Channel channel = NioSocketChannel
2 -->ChannelHandler handler = NettyClient
3 -->URL url = dubbo://192.168.0.100:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-consumer&cellinvokemode=sharing&check=false&codec=dubbo&default.client=netty4&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,sayBye&pid=34996®ister.ip=192.168.0.100&remote.timestamp=1555341190543&side=consumer×tamp=1555341214411
三、进行编码
codec.encode(channel, buffer, msg)
这里的codec是:
Codec2 codec = DubboCountCodec
-->DubboCodec codec = new DubboCodec()
DubboCountCodec:
private DubboCodec codec = new DubboCodec();
// 编码
// @param channel NettyChannel对象
// @param buffer NettyBackedChannelBuffer对象
// @param msg Request/Response
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
codec.encode(channel, buffer, msg);
}
入参:
-
channel:上述的NettyChannel对象
-
buffer:上述的NettyBackedChannelBuffer对象
-
msg:Request对象,其属性如下:
long mId = 0 String mVersion = "2.0.0" boolean mTwoWay = true boolean mEvent = false boolean mBroken = false Object mData = RpcInvocation对象 -->String methodName = "sayHello" -->Class<?>[] parameterTypes = [java.lang.String] -->Object[] arguments = ["world"] -->Map<String, String> attachments = { "path" -> "com.alibaba.dubbo.demo.DemoService" "interface" -> "com.alibaba.dubbo.demo.DemoService" "version" -> "0.0.0" } -->Invoker<?> invoker = DubboInvoker对象
之后调用DubboCodec.encode(Channel channel, ChannelBuffer buffer, Object msg),该方法位于其父类ExchangeCodec中。
// 请求/响应编码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
// @param msg Request/Response
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel); // 获取序列化方式
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.
Bytes.long2bytes(req.getId(), header, 4); // long id ,8 字节
// encode request data.
int savedWriteIndex = buffer.writerIndex(); // 写索引
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // 设置写索引为0+16,先跳过header,先写请求体的字节
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // Hessian2ObjectOutput
if (req.isEvent()) {
// 事件
encodeEventData(channel, out, req.getData());
} else {
// 请求
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer(); // Hessian2ObjectOutput写出到ChannelBufferOutputStream bos
bos.flush();
bos.close();
int len = bos.writtenBytes(); // 请求体字节数
checkPayload(channel, len); // 检查有效负载是否超过限制
Bytes.int2bytes(len, header, 12); // len int 4字节
// write 写入头部header
buffer.writerIndex(savedWriteIndex); // 还原到savedWriteIndex
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); // 写指针跳到最后
}
1.首先利用SPI机制获取序列化方式
Serialization serialization = getSerialization(channel); // 获取序列化方式
getSerialization方法位于ExchangeCodec的父类AbstractCodec中。
protected Serialization getSerialization(Channel channel) {
return CodecSupport.getSerialization(channel.getUrl());
}
public static Serialization getSerialization(URL url) {
return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
url.getParameter("serialization", "hessian2"));
}
最终获取到的Serialization serialization = Hessian2Serialization对象:
public class Hessian2Serialization implements Serialization {
public static final byte ID = 2;
public byte getContentTypeId() {
return ID;
}
public String getContentType() {
return "x-application/hessian2";
}
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
return new Hessian2ObjectOutput(out);
}
public ObjectInput deserialize(URL url, InputStream is) throws IOException {
return new Hessian2ObjectInput(is);
}
}
注意:hessian2序列化方式的id是2,该序列化方式ID会写在协议头里传给服务端,服务端根据序列化方式ID获取对应的序列化方式来反序列化请求体。
2.创建16字节header字节数组
byte[] header = new byte[16];
然后填充第1~2个字节为魔数;填充第3个字节为requestFlag、序列化方式ID(这里是2)、twowayFlag或eventFlag;填充第5~12个字节为requestID(long=64bit=8byte)
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.
Bytes.long2bytes(req.getId(), header, 4); // long id ,8 字节
3.序列化请求体
首先设置buffer的writerIndex:
int savedWriteIndex = buffer.writerIndex(); // 写索引
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // 设置写索引为0+16,先跳过header,先写请求体的字节
首先存储了buffer当前的writeIndex(写索引),从该位置开始到“该位置+15”这一段会写入header字节数组(例如,[0,15]),从“该位置+16”开始向后写入请求体字节数组(例如,[16, x))。
然后就是设置buffer的writerIndex为当前位置+16,因为接下来要先序列化请求体,然后将请求体写入buffer,最后才会将header写入buffer。
序列化请求体:
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos); // Hessian2ObjectOutput
if (req.isEvent()) {
// 事件
encodeEventData(channel, out, req.getData());
} else {
// 请求
encodeRequestData(channel, out, req.getData());
}
out.flushBuffer(); // Hessian2ObjectOutput写出到ChannelBufferOutputStream bos
bos.flush();
bos.close();
首先新建一个ChannelBufferOutputStream对象(该对象继承自java.io.OutputStream抽象类):
private final ChannelBuffer buffer;
private final int startIndex;
// @param buffer NettyBackedChannelBuffer
public ChannelBufferOutputStream(ChannelBuffer buffer) {
if (buffer == null) {
throw new NullPointerException("buffer");
}
this.buffer = buffer;
startIndex = buffer.writerIndex(); // 写索引
}
buffer为上述的NettyBackedChannelBuffer对象;startIndex == 16
然后获取ObjectOutput对象:
// out--ChannelBufferOutputStream对象
public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
return new Hessian2ObjectOutput(out);
}
private final Hessian2Output mH2o;
// os--ChannelBufferOutputStream对象
public Hessian2ObjectOutput(OutputStream os) {
mH2o = new Hessian2Output(os);
mH2o.setSerializerFactory(Hessian2SerializerFactory.SERIALIZER_FACTORY);
}
public final static int SIZE = 4096;
private final byte[] _buffer = new byte[SIZE];
// the output stream/
protected OutputStream _os;
// Creates a new Hessian output stream, initialized with an
// underlying output stream.
// @param os the underlying output stream.
public Hessian2Output(OutputStream os) {
_os = os;
}
最终得到的ObjectOutput对象:
Hessian2ObjectOutput
-->Hessian2Output mH2o
-->byte[] _buffer = new byte[4096]
-->OutputStream _os = 上述的ChannelBufferOutputStream对象
-->SerializerFactory _serializerFactory = Hessian2SerializerFactory实例
最后执行DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data),该方法是真正进行请求体序列化的地方。
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
out.writeObject(inv.getAttachments());
}
其中,channel是上述的NettyChannel实例;out是上述的Hessian2ObjectOutput实例;data是Request对象中的data属性(RpcInvocation对象)
Object mData = RpcInvocation对象
-->String methodName = "sayHello"
-->Class<?>[] parameterTypes = [java.lang.String]
-->Object[] arguments = ["world"]
-->Map<String, String> attachments = {
"path" -> "com.alibaba.dubbo.demo.DemoService"
"interface" -> "com.alibaba.dubbo.demo.DemoService"
"version" -> "0.0.0"
}
-->Invoker<?> invoker = DubboInvoker对象
从DubboCodec.encodeRequestData方法中,可以看到只会序列化Request请求体中的RpcInvocation对象的:
- methodName:方法名
- parameterTypes:参数类型
- arguments:参数值
- attachments:附加参数
其中附加参数中的”dubbo”、”path”、”version”还会单独使用out.writeUTF进行序列化。
首先来看一下:
Hessian2ObjectOutput.writeUTF(String v)
-->Hessian2Output.writeString(String value)
-->printString(String v, int strOffset, int length)
通过printString这个方法,将传入的v存储在Hessian2Output对象的byte[] _buffer = new byte[4096]数组中。
Hessian2Output:
// Writes any object to the output stream.
public void writeObject(Object object) throws IOException {
if (object == null) {
writeNull();
return;
}
// 获取序列化器
Serializer serializer = findSerializerFactory().getSerializer(object.getClass());
serializer.writeObject(object, this);
}
public final SerializerFactory findSerializerFactory() {
SerializerFactory factory = _serializerFactory;
if (factory == null)
_serializerFactory = factory = new SerializerFactory();
return factory;
}
SerializerFactory:
private static HashMap _staticSerializerMap;
private HashMap _cachedSerializerMap;
// Returns the serializer for a class.
// @param cl the class of the object that needs to be serialized.
// @return a serializer object for the serialization.
public Serializer getSerializer(Class cl)
throws HessianProtocolException {
Serializer serializer;
serializer = (Serializer) _staticSerializerMap.get(cl);
if (serializer != null)
return serializer;
if (_cachedSerializerMap != null) {
synchronized (_cachedSerializerMap) {
serializer = (Serializer) _cachedSerializerMap.get(cl);
}
if (serializer != null)
return serializer;
}
......
if (serializer != null) {
}
.......
else if (Map.class.isAssignableFrom(cl)) {
if (_mapSerializer == null)
_mapSerializer = new MapSerializer();
serializer = _mapSerializer;
}
......
if (serializer == null)
serializer = getDefaultSerializer(cl);
if (_cachedSerializerMap == null)
_cachedSerializerMap = new HashMap(8);
synchronized (_cachedSerializerMap) {
_cachedSerializerMap.put(cl, serializer);
}
return serializer;
}
Hessian2Output.writeObject(Object object):
首先获取_serializerFactory工厂,这里是Hessian2SerializerFactory实例。其getSerializer(Class cl)方法位于其父类SerializerFactory中:获取序列化器的逻辑是:首先从_staticSerializerMap中获取相关类型的序列化器(_staticSerializerMap中启动时就缓存好一堆类型的序列化器:具体见com.alibaba.com.caucho.hessian.io.SerializerFactory),如果有返回,否则从_cachedSerializerMap缓存中获取相关的类加载器,如果没有,根据类型先创建序列化器(例如new MapSerializer(),当然还有getDefaultSerializer(cl)来兜底),最后放入缓存_cachedSerializerMap中。最后返回创建好的类加载器。
最后调用MapSerializer.writeObject(Object obj, AbstractHessianOutput out)进行序列化。
DubboCodec.encodeRequestData执行完毕之后,已将所有的信息写入了Hessian2Output对象的byte[] _buffer = new byte[4096]数组中。
注意:
如果在将数据写入到_buffer的过程中,字节量超出了4096,会先执行Hessian2Output.flushBuffer()将_buffer中的数据拷贝到PooledUnsafeDirectByteBuf中,之后再往_buffer中写入字节。
最后执行Hessian2ObjectOutput.flushBuffer()
// Hessian2ObjectOutput
public void flushBuffer() throws IOException {
mH2o.flushBuffer();
}
// Hessian2Output
public final void flushBuffer() throws IOException {
int offset = _offset;
if (!_isStreaming && offset > 0) {
_offset = 0;
_os.write(_buffer, 0, offset);
} else if (_isStreaming && offset > 3) {
int len = offset - 3;
_buffer[0] = 'p';
_buffer[1] = (byte) (len >> 8);
_buffer[2] = (byte) len;
_offset = 3;
_os.write(_buffer, 0, offset);
}
}
此处执行ChannelBufferOutputStream.write(byte[] b, int off, int len)
public void write(byte[] b, int off, int len) throws IOException {
if (len == 0) {
return;
}
buffer.writeBytes(b, off, len);
}
Transfers the specified source array's data to this buffer starting at
the current {@code writerIndex} and increases the {@code writerIndex} by
the number of the transferred bytes (= {@code length}).
@param index the first index of the source
@param length the number of bytes to transfer
@throws IndexOutOfBoundsException if the specified {@code srcIndex} is
less than {@code 0}, if {@code srcIndex
+ length} is greater than {@code
src.length}, or if {@code length} is
greater than {@code this.writableBytes}
void writeBytes(byte[] src, int index, int length);
就是将Hessian2Output对象的byte[] _buffer = new byte[4096]数组中的数据转移到buffer中。
NettyBackedChannelBuffer
-->ByteBuf buffer = PooledUnsafeDirectByteBuf
4.将header写入buffer
int len = bos.writtenBytes(); // 计算请求体字节数
checkPayload(channel, len); // 检查有效负载是否超过限制
Bytes.int2bytes(len, header, 12); // 将请求体长度写入header的第13~16个字节(int=4byte)
// write 写入头部header
buffer.writerIndex(savedWriteIndex); // 设置buffer的writerIndex为该次写入的开始位置
buffer.writeBytes(header); // 将header数组写入buffer
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); // 写指针跳到最后
到此为止,整个编码就结束了。之后存储了header、body数据的ByteBuf由netty自己来进行网络传输。
来看一下请求编码的byte[] header的最终结构:
- 1~2 byte:魔数
- 3 byte:requestFlag、序列化方式ID、twowayFlag或eventFlag
- 5~12 byte :requestID
- 13~16:请求体长度
这里有一个小插曲:
protected static void checkPayload(Channel channel, long size) throws IOException {
int payload = Constants.DEFAULT_PAYLOAD;
if (channel != null && channel.getUrl() != null) {
payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);
}
if (payload > 0 && size > payload) {
ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
logger.error(e);
throw e;
}
}
Dubbo限制了如果传输的请求体长度大于8M,将会直接抛出异常。