Dubbo 服务端请求解码
服务端请求解码总体流程:
NettyCodecAdapter$InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
-->ExchangeCodec.decode(Channel channel, ChannelBuffer buffer)
-->buffer.readBytes(header); //读取header byte[]
-->decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
-->检查魔数、总共可读的字节数是否大于等于16
-->获取请求体长度
-->new ChannelBufferInputStream(buffer, len)
-->DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
-->CodecSupport.getSerialization(URL url, Byte id) // 解析出请求头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
-->Bytes.bytes2long(header, 4) // 获取request id
<!-- 之后创建一个新的Request对象,将request id及后续解析出来的各种request属性塞入该对象中 -->
-->new DecodeableRpcInvocation(channel, req, is, proto)
-->DecodeableRpcInvocation.decode()
-->decode(Channel channel, InputStream input) // 解析请求体参数并将其构造为一个DecodeableRpcInvocation,最终塞到Request对象的data属性中
-->new Hessian2ObjectInput(InputStream is)
-->Hessian2ObjectInput.readObject(Class<?>)/readUTF() 反序列化
-->Hessian2Input.readObject(Class cl)/readString()
-->ChannelBufferInputStream.read(byte[] b, int off, int len)
总体流程:
- 包装请求传过来的ByteBuf为NettyBackedChannelBuffer(简称buffer)
- 从buffer中读取header
- 之后检查魔数、总共可读的字节数是否大于等于16
- 获取请求体body长度
- 解析出请求头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
- 获取requestID
- 创建Request对象,将requestID及后续解析出来的各种request属性塞入该对象中
- 反序列化请求体body,并将其设在DecodeableRpcInvocation中,最后该对象设在Request对象的data属性中
解码是在NettyCodecAdapter中:
// 解码器
private class InternalDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
// 包装Netty的ByteBuf
ChannelBuffer message = new NettyBackedChannelBuffer(input);
// 获取NettyChannel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
Object msg;
int saveReaderIndex;
try {
// decode object.
do {
saveReaderIndex = message.readerIndex();
// 解码message
msg = codec.decode(channel, message);
// 解码时发现需要更多的字节数据
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
// 则重置读指针,等后续接收到更多数据时再解码
message.readerIndex(saveReaderIndex);
break;
} else {
//is it possible to go here ?(没有读到任何数据)
if (saveReaderIndex == message.readerIndex()) {
throw new IOException("Decode without read data.");
}
// 如果读到了正常的消息,写入List<Object> out
if (msg != null) {
out.add(msg);
}
}
} while (message.readable());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
}
一、创建ChannelBuffer
ChannelBuffer message = new NettyBackedChannelBuffer(input);
与客户端请求编码类似,最终得到的message:
NettyBackedChannelBuffer
-->ByteBuf buffer = PooledUnsafeDirectByteBuf
二、获取NettyChannel
之后先获取io.netty.Channel实例,然后包装在NettyChannel中。
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
与客户端请求编码类似,最终得到的channel:
-->Channel channel = NioSocketChannel
-->ChannelHandler handler = NettyServer
-->URL url = dubbo://192.168.0.100:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.0.100&bind.port=20881&cellinvokemode=sharing&channel.readonly.sent=true&codec=dubbo&default.server=netty4&dubbo=2.0.0&generic=false&heartbeat=60000&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello,sayBye&pid=51897&side=provider×tamp=1555424608881
三、进行解码
这里的codec是:
Codec2 codec = DubboCountCodec
-->DubboCodec codec = new DubboCodec()
DubboCountCodec:
// 解码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int save = buffer.readerIndex(); // 当前读索引
MultiMessage result = MultiMessage.create();
do {
Object obj = codec.decode(channel, buffer);
// 如果当前buffer数据不足,则跳出循环,在下一次依旧从save处读取
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
// 重置读索引
buffer.readerIndex(save);
break;
} else {
// 如果消息正常,添加消息到MultiMessage的List messages中
result.addMessage(obj);
logMessageLength(obj, buffer.readerIndex() - save);
save = buffer.readerIndex();
}
} while (true);
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
if (result.size() == 1) {
return result.get(0);
}
return result;
}
MultiMessage:
private final List messages = new ArrayList();
private MultiMessage() {
}
public static MultiMessage create() {
return new MultiMessage();
}
public void addMessage(Object msg) {
messages.add(msg);
}
DubboCodec的父类ExchangeCodec:
// 解码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes(); // 获取buffer的可读字节数
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header); // 将buffer中的前16个字节读入header
return decode(channel, buffer, readable, header); // 反序列化请求体body,构造成DecodeableRpcInvocation,塞入Request的data属性中
}
// 解码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
// @param readable 总共可读取的字节数
// @param header 头部
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number. 检查魔数
// 魔数不匹配
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
return super.decode(channel, buffer, readable, header);
}
// check length.
if (readable < HEADER_LENGTH) { // 总共可读的字节数小于16字节
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
int len = Bytes.bytes2int(header, 12); // 从header中获取body长度
checkPayload(channel, len); // 检测body是否超8M
int tt = len + HEADER_LENGTH;
if (readable < tt) { // 如果当前可读的数据<header+body总长度(说明发生了拆包)
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
return decodeBody(channel, is, header); // 解码body
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
DubboCodec.decodeBody():
// 解码body
// @param channel NettyChannel
// @param is ChannelBufferInputStream
// @param header 头部数据
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // proto: 序列化方式id
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 根据序列化方式id,获取序列化方式实例
// get request id. 获取request id
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 客户端收到的响应解码
// decode response.
...
} else {
// 服务端收到的请求解码
// decode request.
Request req = new Request(id); // 构造Request
req.setVersion("2.0.0");
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) { // 心跳
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) { // 事件
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
// 构造DecodeableRpcInvocation
inv = new DecodeableRpcInvocation(channel, req, is, proto);
// 解码请求body
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
就是构造Request参数,重点构造其中的data属性(实际上是一个DecodeableRpcInvocation实例)
DecodeableRpcInvocation:
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream); // 解码
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
// 解码请求body
// @param channel NettyChannel
// @param input ChannelBufferInputStream
public Object decode(Channel channel, InputStream input) throws IOException {
// 创建Hessian2ObjectInput
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 下边的读取顺序与序列化时的顺序必须一模一样
setAttachment(Constants.DUBBO_VERSION_KEY, in.readUTF());
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
setParameterTypes(pts);
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
attachment.putAll(map);
setAttachments(attachment);
}
//decode argument ,may be callback
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
}
return this;
}
上述的setXXX方法,实际上就是为当前的DecodeableRpcInvocation设置各种属性,in.readUTF()和in.readobject都是反序列化的方法,前者将byte[]反序列化为String,后者将byte[]反序列化为Object。
至此,服务端请求解码过程结束。