Dubbo 服务端响应编码
服务端响应编码总体流程:
NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext ctx, Object msg, ByteBuf out)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
-->ExchangeCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
-->encodeResponse(Channel channel, ChannelBuffer buffer, Response res)
-->getSerialization(Channel channel) // 获取Hessian2Serialization序列化方式实例
-->CodecSupport.getSerialization(URL url)
-->ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter("serialization", "hessian2"))
<!-- 构造一个16字节的byte[16] header -->
-->byte[] header = new byte[16]
-->Bytes.short2bytes(MAGIC, header) // 设置前两个字节为魔数0xdabb
<!-- 第三个字节:序列化方式ID,如果响应是心跳,添加eventFlag -->
-->header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
<!-- 第四个字节:响应状态 -->
-->header[3] = res.getStatus();
<!-- 设置第5~12个字节(long是64bit,即8byte):respID == requestID -->
-->Bytes.long2bytes(res.getId(), header, 4);
<!-- 下面序列化响应体数据 -->
-->new Hessian2ObjectOutput(out)
-->DubboCodec.encodeResponseData(Channel channel, ObjectOutput out, Object data)
-->Bytes.int2bytes(len, header, 12); // 设置第13~16个字节(int是32位,4个字节):响应消息体长度
-->buffer.writeBytes(header); // 将header写入buffer的前16字节
服务端响应编码过程与客户端请求编码过程很相似。
// ExchangeCodec.encodeResponse() 响应编码
// @param channel NettyChannel
// @param buffer NettyBackedChannelBuffer
// @param res Response
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
Serialization serialization = getSerialization(channel); // 序列化实例
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
// set response status.
byte status = res.getStatus();
header[3] = status;
// set request id.
Bytes.long2bytes(res.getId(), header, 4);
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); // buffer先写入响应体
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// out--Hessian2ObjectOutput对象
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) { // 服务端响应ok(包含服务端接口方法执行时的异常)
if (res.isHeartbeat()) { // 心跳
encodeHeartbeatData(channel, out, res.getResult());
} else {
// 正常响应
encodeResponseData(channel, out, res.getResult());
}
} else out.writeUTF(res.getErrorMessage()); // 服务端响应status不是OK,直接写出错误内容
out.flushBuffer();
bos.flush();
bos.close();
int len = bos.writtenBytes(); // 响应体数据长度
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12); // header写入响应体长度
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header. buffer写入header
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// 将已写buffer内容清空
buffer.writerIndex(savedWriteIndex);
// 发送失败信息给Consumer,否则Consumer只能等超时了
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
if (t instanceof ExceedPayloadLimitException) {
logger.warn(t.getMessage(), t);
try {
r.setErrorMessage(t.getMessage());
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
}
} else {
// FIXME 在Codec中打印出错日志?在IoHanndler的caught中统一处理?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
try {
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
}
}
}
// 重新抛出收到的异常
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t.getMessage(), t);
}
}
}
DubboCodec.encodeResponseData:
// DubboCodec.encodeResponseData() 序列化response body数据
// @param channel NettyChannel
// @param out Hessian2ObjectOutput对象
// @param data RpcResult
@Override
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
Result result = (Result) data;
Throwable th = result.getException();
if (th == null) { // 无异常
Object ret = result.getValue();
if (ret == null) {
out.writeByte(RESPONSE_NULL_VALUE); // 返回值为null
} else {
out.writeByte(RESPONSE_VALUE); // 返回值是正常值,先写RESPONSE_VALUE标志
out.writeObject(ret); // 再写出返回值
}
} else {
// 存在异常
out.writeByte(RESPONSE_WITH_EXCEPTION); // 先写出异常标志
out.writeObject(th); // 再写异常数据
}
}
注意:out.writeByte(RESPONSE_VALUE);写入这个响应类型,是为了后面客户端解码响应Response的时候用的。
客户端请求编码与服务端响应编码的对比:
- 客户端请求编码的byte[] header的最终结构:
- 第1~2 byte:魔数
- 第3 byte:requestFlag、序列化方式ID、twowayFlag或eventFlag
- 第5~12 byte :requestID
- 第13~16 byte:请求体长度
-
服务端响应编码的byte[] header的最终结构:
- 第1~2 byte:魔数
- 第3 byte:序列化方式ID、eventFlag(如果响应信息是心跳信息,添加eventFlag)
- 第4 byte:响应状态,20代表成功OK
- 第5~12 byte :reponseID(实际上==requestID)
- 第13~16 byte:响应体长度