Dubbo 服务端接收请求并发送响应流程分析
一、总体流程图
//1.服务端接收请求
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
-->HeartbeatHandler.received(Channel channel, Object message)
-->AllChannelHandler.received(Channel channel, Object message)
-->ExecutorService cexecutor = getExecutorService() //提供者FixedThreadPool
-->executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
-->ChannelEventRunnable.run()
-->DecodeHandler.received(Channel channel, Object message)
-->decode(Object message)
-->HeaderExchangeHandler.received(Channel channel, Object message)
//2.服务端处理请求
-->Response response = handleRequest(exchangeChannel, request)
-->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message是RpcInvocation
//首先获取exporter,之后再获取invoker
-->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20881
-->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
-->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
//执行Filter链
-->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
-->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
-->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
-->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
//执行真正的invoker调用
-->JavassistProxyFactory$AbstractProxyInvoker.invoke(Invocation invocation)
-->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
-->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
-->DemoServiceImpl2.sayHello(String name)
-->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
//3.服务端发送响应
-->NettyChannel.send(Response)
-->AbstractPeer.send(Response)
-->NettyChannel.send(Response, boolean sent)
-->NioAcceptedSocketChannel.write(message)//已经是netty的东西,这里的message=Response实例:最重要的是其属性mResult=RpcResult [result=Hello world, response form provider: 172.16.132.166:20881, exception=null]
-->NettyServer.send(Response)
-->AbstractPeer.sent(NettyChannel, Response)
-->MultiMessageHandler.sent(NettyChannel, Response)
-->HeartbeatHandler.sent(NettyChannel, Response)
-->AllChannelHandler.sent(NettyChannel, Response)
-->DecodeHandler.sent(NettyChannel, Response)
-->HeaderExchangeHandler.sent(NettyChannel, Response)
-->DubboProtocol.requestHandler.sent(HeaderExchangeChannel, Response) // 无任何逻辑
二、源码分析
1.接收请求
netty
通信是在netty
的handler
中进行消息的接收处理和发送的。来看一下NettyServer
的handler
。
protected void doOpen() throws Throwable {
// ...
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
// ...
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// ...
}
NettyHandler.messageReceived
:
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// 提供者接受请求:参数:NettyChannel,message: Request handler--NettyServer
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
首先会执行NettyServer
父类AbstractPeer
的received
方法,其调用MultiMessageHandler.received
:
// 收到Request消息
// channel--NettyChannel,message--Request
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
handler.received(channel, obj);
}
} else {
handler.received(channel, message); // handler--HeartbeatHandler
}
}
HeartbeatHandler.received(Channel channel, Object message)
:
// 收到请求
// @param channel NettyChannel
// @param message Request
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel); // 设置读时间戳
// 心跳request
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion()); // 与request invoke id一一对应
res.setEvent(Response.HEARTBEAT_EVENT); // 心跳事件
channel.send(res); // 直接发送
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
// 心跳响应
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
// handler--AllChannelHandler
handler.received(channel, message);
}
AllChannelHandler.received(Channel channel, Object message)
:
protected final ExecutorService executor; // FixedThreadPool
protected final ChannelHandler handler; // DecodeHandler
// 收到Request消息
// @param channel NettyChannel
// @param message Request
public void received(Channel channel, Object message) throws RemotingException {
// 提供者: FixedThreadPool
ExecutorService executorService = getExecutorService();
try {
// 线程池处理任务
executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
//TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
//fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
if(message instanceof Request && t instanceof RejectedExecutionException){
Request request = (Request)message;
if(request.isTwoWay()){
String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
"detail msg:" + t.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
}
}
private ExecutorService getExecutorService() {
ExecutorService executorService = executor;
if (executorService == null || executorService.isShutdown()) {
executorService = SHARED_EXECUTOR;
}
return executorService;
}
这里首先创建了一个线程任务ChannelEventRunnable
,之后丢入线程池执行。
ChannelEventRunnable.run()
:
private final ChannelHandler handler; // DecodeHandler实例
public void run() {
switch (state) {
case CONNECTED:
try {
handler.connected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case DISCONNECTED:
try {
handler.disconnected(channel);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
}
break;
case SENT:
try {
handler.sent(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case RECEIVED: // DecodeHandler实例
try {
handler.received(channel, message);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is " + message, e);
}
break;
case CAUGHT:
try {
handler.caught(channel, exception);
} catch (Exception e) {
logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
+ ", message is: " + message + ", exception is " + exception, e);
}
break;
default:
logger.warn("unknown state: " + state + ", message is " + message);
}
}
DecodeHandler.received(Channel channel, Object message)
:
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
if (message instanceof Request) {
decode(((Request) message).getData()); // 解码Invocation对象
}
if (message instanceof Response) {
decode(((Response) message).getResult());
}
// HeaderExchangeHandler
handler.received(channel, message);
}
2.处理请求
HeaderExchangeHandler.received(Channel channel, Object message)
:
// 收到请求消息并处理
// @param channel NettyChannel.
// @param message message. e.g. Request
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 设置读时间戳
// 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request. 处理请求---提供者端
Request request = (Request) message;
if (request.isEvent()) { // 事件请求
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) { // 双向,有返回值
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData()); // 调用方法但不回复
}
}
} else if (message instanceof Response) {
// 处理响应--消费者端
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
private final ExchangeHandler handler; // DubboProtocol$ExchangeHandler
// 处理请求
// @param channel HeaderExchangeChannel
// @param req Request
private Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion()); // 构造response
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg // 请求Request解码出现错误
+ ", may be param class not found");
res.setStatus(Response.BAD_REQUEST);
return res;
}
// find handler by message class.
Object msg = req.getData(); // RpcInvocation
try {
// handle data.
Object result = handler.reply(channel, msg); // 返回值RpcResult
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
DubboProtocol$ExchangeHandler.reply(ExchangeChannel channel, Object message)
:
// 响应请求,channel--HeaderExchangeChannel,message--RpcInvocation
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv); // RegistryProtocol$InvokerDelegete
// ...
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 设置消费者端地址
return invoker.invoke(inv); // RpcResult包裹具体方法的返回值
}
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
首先是获取invoker
,之后使用该invoker
执行真正调用。
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
// ...
int port = channel.getLocalAddress().getPort(); // provider端端口20881
String path = inv.getAttachments().get(Constants.PATH_KEY); // 服务路径
// ...
// group/path:version:port
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet()
+ ", may be version or group mismatch, or may be graceful shutdown problem (2.5.3 or 3.1.*)"
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()
+ ", message: " + inv);
}
// Filter包装后的Invoker---InvokerDelegete
return exporter.getInvoker();
}
这里serviceKey
是:com.alibaba.dubbo.demo.DemoService:20881
。实际上是group/serviceName:serviceVersion:port
。
Map<String, Exporter<?>> exporterMap
在服务暴露时就已经初始化好了。"com.alibaba.dubbo.demo.DemoService:20881"->DubboExporter实例
。该实例包含一个被Filter
链包裹的Invoker
实例:RegistryProtocol$InvokerDelegete
实例。
之后开始执行Filter
链,直到最后执行到RegistryProtocol$InvokerDelegete.invoke
,该方法实际上是在RegistryProtocol$InvokerDelegete
的父类InvokerWrapper
中执行,InvokerWrapper
调用DelegateProviderMetaDataInvoker
实例,DelegateProviderMetaDataInvoker
实例再调用被其包装的AbstractProxyInvoker
实例的invoke(Invocation invocation)
方法。
private final T proxy; // DemoServiceImpl2实例
public Result invoke(Invocation invocation) throws RpcException {
try {
return new RpcResult(doInvoke(proxy, invocation.getMethodName(),
invocation.getParameterTypes(), invocation.getArguments()));
} catch (InvocationTargetException e) {
return new RpcResult(e.getTargetException());
} catch (Throwable e) {
throw new RpcException("Failed to invoke remote proxy method "
+ invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
由于该AbstractProxyInvoker
实例是JavassistProxyFactory
的匿名内部类,即该AbstractProxyInvoker
实例是在JavassistProxyFactory
中生成的:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
// 包装类
final Wrapper wrapper = Wrapper
.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 生成invoker
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
因此该AbstractProxyInvoker
实例的invoke(Invocation invocation)
方法的调用,先调用其doInvoke
方法,doInvoke
方法转由Wrapper
执行invokeMethod
方法,得到结果后返回并包装为RpcResult
。
这里调用了Wrapper
类的invokeMethod
方法,Wrapper
是一个动态生成的类:
public class Wrapper1 extends Wrapper {
public static String[] pns; // property name array
public static java.util.Map pts = new HashMap();// <property name, property types>
public static String[] mns;// all method name array.
public static String[] dmns;// declared method name array.
public static Class[] mts0; // 方法参数类型数组
// 调用方法
// @param o 服务实现类实例
// @param n 方法名称
// @param p 参数类型
// @param v 参数值
// @return 方法调用结果
// @throws java.lang.reflect.InvocationTargetException
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.alibaba.dubbo.demo.provider.DemoServiceImpl2 w;
try {
w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl2) o);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("sayHello".equals(n) && p.length == 1) {
return ($w) w.sayHello((java.lang.String) v[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl2.");
}
}
这里就执行到了DemoServiceImpl
的sayHello(String name)
方法。之后将返回结果封装为RpcResult
并返回,一直返回到HeaderExchangeHandler
的received(Channel channel, Object message)
。
// 收到请求消息并处理
// @param channel NettyChannel.
// @param message message. e.g. Request
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 设置读时间戳
// 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request. 处理请求---提供者端
Request request = (Request) message;
if (request.isEvent()) { // 事件请求
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) { // 双向,有返回值
Response response = handleRequest(exchangeChannel, request);
channel.send(response); // 发送响应
} else {
handler.received(exchangeChannel, request.getData()); // 调用方法但不回复
}
}
} else if (message instanceof Response) {
// 处理响应--消费者端
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
3.发送响应
得到响应结果Response
之后,调用channel.send(response);
将响应结果返回给客户端,这里的channel
是NettyChannel
,执行NettyChannel.send(Object message)
方法,实际上调用的是其父类AbstractPeer
的send(Object message)
方法,AbstractPeer.send(Object message)
方法再调用NettyChannel.send(Object message, boolean sent)
方法:
// message--Response,sent--是否等待响应发送完毕
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent); // 检查Channel是否已经关闭
boolean success = true;
int timeout = 0;
try {
// channel--NioAcceptedSocketChannel
ChannelFuture future = channel.write(message); // 异步响应
if (sent) { // sent=true,等待响应发送完成
// 超时
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout); // 等待写操作完成
}
Throwable cause = future.getCause(); // 若写操作失败,得到失败异常
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
+ ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
NettyChannel.send(Object message, boolean sent)
方法调用NioAcceptedSocketChannel.write(Object message)
将消息写回给客户端。
回给客户端之前,同样会经过NettyHandler
的处理。
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
super.writeRequested(ctx, e); // 将写出响应的事件往后传播,即编码后发送
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
// handler--NettyServer, message--Response
handler.sent(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
NettyHandler.writeRequested
方法中首先将写出响应的事件往后传播,即用编码器将Response
编码后发送给客户端。接着调用NettyServer.sent(NettyChannel, Response)
方法,其调用父类AbstractPeer
的sent(Channel ch, Object msg)
方法:
// AbstractPeer
public void sent(Channel ch, Object msg) throws RemotingException {
if (closed) {
return;
}
// MultiMessageHandler
handler.sent(ch, msg);
}
接着调用MultiMessageHandler.sent(Channel channel, Object message)
方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--HeartBeatHandler
handler.sent(channel, message);
}
接着调用HeartBeatHandler.sent(Channel channel, Object message)
,实际上调用的也是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--AllChannelHandler
handler.sent(channel, message);
}
接着调用AllChannelHandler.sent(Channel channel, Object message)
方法,实际上调用的是父类WrappedChannelHandler.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--DecodeHandler实例
handler.sent(channel, message);
}
继续调用DecodeHandler.sent(Channel channel, Object message)
方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
// handler--HeaderExchangeHandler
handler.sent(channel, message);
}
继续调用HeaderExchangeHandler.sent(Channel channel, Object message)
方法:
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); // 设置写时间戳
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // NettyChannel绑定一个HeaderExchangeChannel
try {
handler.sent(exchangeChannel, message); // do nothing
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request); // 标记发送状态
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
HeaderExchangeHandler.sent(Channel channel, Object message)
方法中,调用了DubboProtocol
匿名内部类requestHandler
的sent(Channel channel, Object message)
,这里没有什么逻辑。
至此,服务端接受请求、处理请求、发送响应的过程分析完毕。