Dubbo 客户端响应解码
客户端响应解码整体流程:
NettyCodecAdapter$InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
-->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
-->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
-->DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
-->ExchangeCodec.decode(Channel channel, ChannelBuffer buffer)
-->buffer.readBytes(header); // 读取header byte[]
-->decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
-->检查魔数、总共可读的字节数是否大于等于16
-->获取响应消息体长度
-->new ChannelBufferInputStream(buffer, len)
-->DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
-->CodecSupport.getSerialization(URL url, Byte id) // 解析出响应头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
-->Bytes.bytes2long(header, 4) // 获取respID
<!-- 之后创建一个新的Response对象,将respID及后续解析出来的各种resp属性塞入该对象中 -->
-->new DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
-->DecodeableRpcResult.decode()
-->decode(Channel channel, InputStream input) // 解析响应体参数并将其构造为一个DecodeableRpcResult,最终塞到Response对象的mResult属性中
-->ObjectInput in = new Hessian2ObjectInput(InputStream is)
-->反序列化:in.readObject()
客户端响应解码与服务端请求解码类似,下面只介绍不同点。
// DubboCodec.decodeBody 解码响应body
// @param channel NettyChannel
// @param is ChannelBufferInputStream
// @param header 头部数据
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); // 获取序列化id
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); // 获取序列话方式
// get request id.
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {// 客户端收到的响应解码
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT); // 心跳事件
}
// get status.
byte status = header[3];
res.setStatus(status); // 设置响应状态
if (status == Response.OK) { // 服务端响应OK(包含服务端服务接口方法调用时的异常)
try {
Object data;
if (res.isHeartbeat()) {
// 心跳
// deserialize(s, channel.getUrl(), is)--Hessian2ObjectInput(is)
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
// 事件
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
} else {
// 正常业务响应
DecodeableRpcResult result;
if (channel.getUrl().getParameter(
Constants.DECODE_IN_IO_THREAD_KEY,
Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data); // 设置Result
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR); // 客户端解码出现异常
res.setErrorMessage(StringUtils.toString(t));
}
} else {
// 服务端响应status!=OK,直接读取错误消息
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
...
}
}
DecodeableRpcResult.decode:
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc result failed: " + e.getMessage(), e);
}
// 客户端解码出现错误
response.setStatus(Response.CLIENT_ERROR);
response.setErrorMessage(StringUtils.toString(e));
} finally {
hasDecoded = true;
}
}
}
// @param channel NettyChannel
// @param input ChannelBufferInputStream
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input); // Hessian2ObjectInput(input)对象
byte flag = in.readByte(); // 返回结果标志,这个值是服务端响应编码进来的,用来表示响应的返回类型
switch (flag) {
// 返回值为null,不作处理
case DubboCodec.RESPONSE_NULL_VALUE:
break;
// 存在正常返回值
case DubboCodec.RESPONSE_VALUE:
try {
Type[] returnType = RpcUtils.getReturnTypes(invocation); // 获取返回值类型
// 读取并设置result
// TODO: 返回值类型为void,为什么还要读取in.readObject()??
setValue(returnType == null || returnType.length == 0 ? in.readObject() :
(returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
: in.readObject((Class<?>) returnType[0], returnType[1])));
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
// 服务端接口调用时抛出了异常
case DubboCodec.RESPONSE_WITH_EXCEPTION:
try {
Object obj = in.readObject(); // 读取异常
if (obj instanceof Throwable == false)
throw new IOException("Response data error, expect Throwable, but get " + obj);
setException((Throwable) obj); // 设置服务端接口调用异常exception
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read response data failed.", e));
}
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}
setValue:设置DecodeableRpcResult的Object result属性。
setException:设置DecodeableRpcResult的Throwable exception属性。
至此,客户端解码响应Response流程结束。