Dubbo 服务端接收请求并发送响应流程分析

服务端请求处理

Posted by Jay on March 11, 2019

Dubbo 服务端接收请求并发送响应流程分析

一、总体流程图

//1.服务端接收请求
NettyHandler.messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-->MultiMessageHandler.received(Channel channel, Object message)
  -->HeartbeatHandler.received(Channel channel, Object message)
    -->AllChannelHandler.received(Channel channel, Object message)
      -->ExecutorService cexecutor = getExecutorService() //提供者FixedThreadPool
      -->executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message))
        -->ChannelEventRunnable.run()
          -->DecodeHandler.received(Channel channel, Object message)
            -->decode(Object message)
            -->HeaderExchangeHandler.received(Channel channel, Object message)
              //2.服务端处理请求
              -->Response response = handleRequest(exchangeChannel, request)
                -->DubboProtocol.requestHandler.reply(ExchangeChannel channel, Object message)//这里的message是RpcInvocation
				  //首先获取exporter,之后再获取invoker
				  -->getInvoker(Channel channel, Invocation inv)//组装serviceKey=com.alibaba.dubbo.demo.DemoService:20881
				    -->(DubboExporter<?>) exporterMap.get(serviceKey)//从Map<String, Exporter<?>> exporterMap中根据serviceKey获取DubboExport实例,
				    -->exporter.getInvoker()//获取RegistryProtocol$InvokerDelegete实例
				  //执行Filter链
				  -->EchoFilter.invoke(Invoker<?> invoker, Invocation inv)
				    -->ClassLoaderFilter.nvoke(Invoker<?> invoker, Invocation invocation)
				      -->GenericFilter.invoke(Invoker<?> invoker, Invocation inv)
				        -->ContextFilter.invoke(Invoker<?> invoker, Invocation invocation)
						  -->TraceFilter.invoke(Invoker<?> invoker, Invocation invocation)
						    -->TimeoutFilter.invoke(Invoker<?> invoker, Invocation invocation)
						      -->MonitorFilter.invoke(Invoker<?> invoker, Invocation invocation)
						        -->ExceptionFilter.invoke(Invoker<?> invoker, Invocation invocation)
						          //执行真正的invoker调用
								  -->JavassistProxyFactory$AbstractProxyInvoker.invoke(Invocation invocation)
						            -->JavassistProxyFactory$AbstractProxyInvoker.doInvoke
								      -->Wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments)
										-->DemoServiceImpl2.sayHello(String name)
							        -->new RpcResult(Object result)//将返回值result包装成RpcResult(最后该参数会被包装为Response)
	      	  //3.服务端发送响应
              -->NettyChannel.send(Response)
                -->AbstractPeer.send(Response)
                  -->NettyChannel.send(Response, boolean sent)
                    -->NioAcceptedSocketChannel.write(message)//已经是netty的东西,这里的message=Response实例:最重要的是其属性mResult=RpcResult [result=Hello world, response form provider: 172.16.132.166:20881, exception=null]
                      -->NettyServer.send(Response)
                        -->AbstractPeer.sent(NettyChannel, Response)
                          -->MultiMessageHandler.sent(NettyChannel, Response)
                            -->HeartbeatHandler.sent(NettyChannel, Response)
                              -->AllChannelHandler.sent(NettyChannel, Response)
                                -->DecodeHandler.sent(NettyChannel, Response)
                                  -->HeaderExchangeHandler.sent(NettyChannel, Response)
                                    -->DubboProtocol.requestHandler.sent(HeaderExchangeChannel, Response) // 无任何逻辑

二、源码分析

1.接收请求

netty通信是在nettyhandler中进行消息的接收处理和发送的。来看一下NettyServerhandler

protected void doOpen() throws Throwable {
    // ...
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    // ...
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // ...
}

NettyHandler.messageReceived:

public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        // 提供者接受请求:参数:NettyChannel,message: Request handler--NettyServer
        handler.received(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

首先会执行NettyServer父类AbstractPeerreceived方法,其调用MultiMessageHandler.received:

// 收到Request消息
// channel--NettyChannel,message--Request
public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof MultiMessage) {
        MultiMessage list = (MultiMessage) message;
        for (Object obj : list) {
            handler.received(channel, obj);
        }
    } else {
        handler.received(channel, message); // handler--HeartbeatHandler
    }
}

HeartbeatHandler.received(Channel channel, Object message):

// 收到请求
// @param channel NettyChannel
// @param message Request
public void received(Channel channel, Object message) throws RemotingException {
    setReadTimestamp(channel); // 设置读时间戳
    // 心跳request
    if (isHeartbeatRequest(message)) {
        Request req = (Request) message;
        if (req.isTwoWay()) {
            Response res = new Response(req.getId(), req.getVersion()); // 与request invoke id一一对应
            res.setEvent(Response.HEARTBEAT_EVENT); // 心跳事件
            channel.send(res); // 直接发送
            if (logger.isInfoEnabled()) {
                int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                if (logger.isDebugEnabled()) {
                    logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                            + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                            + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                }
            }
        }
        return;
    }
    // 心跳响应
    if (isHeartbeatResponse(message)) {
        if (logger.isDebugEnabled()) {
            logger.debug(
                    new StringBuilder(32)
                            .append("Receive heartbeat response in thread ")
                            .append(Thread.currentThread().getName())
                            .toString());
        }
        return;
    }
    // handler--AllChannelHandler
    handler.received(channel, message);
}

AllChannelHandler.received(Channel channel, Object message):

protected final ExecutorService executor; // FixedThreadPool
protected final ChannelHandler handler; // DecodeHandler

// 收到Request消息
// @param channel NettyChannel
// @param message Request
public void received(Channel channel, Object message) throws RemotingException {
    // 提供者: FixedThreadPool
    ExecutorService executorService = getExecutorService();
    try {
        // 线程池处理任务
        executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
        //TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
        //fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
       if(message instanceof Request && t instanceof RejectedExecutionException){
          Request request = (Request)message;
          if(request.isTwoWay()){
             String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
                        "detail msg:" + t.getMessage();
             Response response = new Response(request.getId(), request.getVersion());
             response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
             response.setErrorMessage(msg);
             channel.send(response);
             return;
          }
       }
        throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
    }
}

private ExecutorService getExecutorService() {
    ExecutorService executorService = executor;
    if (executorService == null || executorService.isShutdown()) {
        executorService = SHARED_EXECUTOR;
    }
    return executorService;
}

这里首先创建了一个线程任务ChannelEventRunnable,之后丢入线程池执行。

ChannelEventRunnable.run():

private final ChannelHandler handler; // DecodeHandler实例

public void run() {
    switch (state) {
        case CONNECTED:
            try {
                handler.connected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
        case DISCONNECTED:
            try {
                handler.disconnected(channel);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
            }
            break;
        case SENT:
            try {
                handler.sent(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
        case RECEIVED: // DecodeHandler实例
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
            break;
        case CAUGHT:
            try {
                handler.caught(channel, exception);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is: " + message + ", exception is " + exception, e);
            }
            break;
        default:
            logger.warn("unknown state: " + state + ", message is " + message);
    }
}

DecodeHandler.received(Channel channel, Object message):

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }

    if (message instanceof Request) {
        decode(((Request) message).getData()); // 解码Invocation对象
    }

    if (message instanceof Response) {
        decode(((Response) message).getResult()); 
    }
    // HeaderExchangeHandler
    handler.received(channel, message);
}
2.处理请求

HeaderExchangeHandler.received(Channel channel, Object message):

// 收到请求消息并处理
// @param channel NettyChannel.
// @param message message. e.g. Request
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 设置读时间戳
    // 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request. 处理请求---提供者端
            Request request = (Request) message;
            if (request.isEvent()) { // 事件请求
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) { // 双向,有返回值
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData()); // 调用方法但不回复
                }
            }
        } else if (message instanceof Response) {
            // 处理响应--消费者端
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

private final ExchangeHandler handler; // DubboProtocol$ExchangeHandler

// 处理请求
// @param channel HeaderExchangeChannel
// @param req Request
private Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
    Response res = new Response(req.getId(), req.getVersion()); // 构造response
    if (req.isBroken()) {
        Object data = req.getData();

        String msg;
        if (data == null) {
            msg = null;
        } else if (data instanceof Throwable) {
            msg = StringUtils.toString((Throwable) data);
        } else {
            msg = data.toString();
        }
        res.setErrorMessage("Fail to decode request due to: " + msg // 请求Request解码出现错误
                            + ", may be param class not found");
        res.setStatus(Response.BAD_REQUEST);

        return res;
    }
    // find handler by message class.
    Object msg = req.getData(); // RpcInvocation
    try {
        // handle data.
        Object result = handler.reply(channel, msg); // 返回值RpcResult
        res.setStatus(Response.OK);
        res.setResult(result);
    } catch (Throwable e) {
        res.setStatus(Response.SERVICE_ERROR);
        res.setErrorMessage(StringUtils.toString(e));
    }
    return res;
}

DubboProtocol$ExchangeHandler.reply(ExchangeChannel channel, Object message):

// 响应请求,channel--HeaderExchangeChannel,message--RpcInvocation
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceof Invocation) {
        Invocation inv = (Invocation) message;
        Invoker<?> invoker = getInvoker(channel, inv); // RegistryProtocol$InvokerDelegete
        // ...
        RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());// 设置消费者端地址
        return invoker.invoke(inv); // RpcResult包裹具体方法的返回值
    }
    throw new RemotingException(channel, "Unsupported request: "
            + (message == null ? null : (message.getClass().getName() + ": " + message))
            + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}

首先是获取invoker,之后使用该invoker执行真正调用。

protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
    // ...
    int port = channel.getLocalAddress().getPort(); // provider端端口20881
    String path = inv.getAttachments().get(Constants.PATH_KEY); // 服务路径
    // ...
    // group/path:version:port
    String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));

    DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

    if (exporter == null) {
        throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet()
                + ", may be version or group mismatch, or may be graceful shutdown problem (2.5.3 or 3.1.*)"
                + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()
                + ", message: " + inv);
    }

    // Filter包装后的Invoker---InvokerDelegete
    return exporter.getInvoker();
}

这里serviceKey是:com.alibaba.dubbo.demo.DemoService:20881。实际上是group/serviceName:serviceVersion:port

Map<String, Exporter<?>> exporterMap在服务暴露时就已经初始化好了。"com.alibaba.dubbo.demo.DemoService:20881"->DubboExporter实例。该实例包含一个被Filter链包裹的Invoker实例:RegistryProtocol$InvokerDelegete实例。

之后开始执行Filter链,直到最后执行到RegistryProtocol$InvokerDelegete.invoke,该方法实际上是在RegistryProtocol$InvokerDelegete的父类InvokerWrapper中执行,InvokerWrapper调用DelegateProviderMetaDataInvoker实例,DelegateProviderMetaDataInvoker实例再调用被其包装的AbstractProxyInvoker实例的invoke(Invocation invocation)方法。

private final T proxy; // DemoServiceImpl2实例

public Result invoke(Invocation invocation) throws RpcException {
    try {
        return new RpcResult(doInvoke(proxy, invocation.getMethodName(),
                invocation.getParameterTypes(), invocation.getArguments()));
    } catch (InvocationTargetException e) {
        return new RpcResult(e.getTargetException());
    } catch (Throwable e) {
        throw new RpcException("Failed to invoke remote proxy method "
                + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

由于该AbstractProxyInvoker实例是JavassistProxyFactory的匿名内部类,即该AbstractProxyInvoker实例是在JavassistProxyFactory中生成的:

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper类不能正确处理带$的类名
    // 包装类
    final Wrapper wrapper = Wrapper
            .getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    // 生成invoker
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

因此该AbstractProxyInvoker实例的invoke(Invocation invocation)方法的调用,先调用其doInvoke方法,doInvoke方法转由Wrapper执行invokeMethod方法,得到结果后返回并包装为RpcResult

这里调用了Wrapper类的invokeMethod方法,Wrapper是一个动态生成的类:

public class Wrapper1 extends Wrapper {

    public static String[] pns; // property name array
    public static java.util.Map pts = new HashMap();// <property name, property types>
    public static String[] mns;// all method name array.
    public static String[] dmns;// declared method name array.
    public static Class[] mts0; // 方法参数类型数组

    // 调用方法
    // @param o 服务实现类实例
    // @param n 方法名称
    // @param p 参数类型
    // @param v 参数值
    // @return 方法调用结果
    // @throws java.lang.reflect.InvocationTargetException
    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        com.alibaba.dubbo.demo.provider.DemoServiceImpl2 w;
        try {
            w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl2) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            if ("sayHello".equals(n) && p.length == 1) {
                return ($w) w.sayHello((java.lang.String) v[0]);
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl2.");
    }
}

这里就执行到了DemoServiceImplsayHello(String name)方法。之后将返回结果封装为RpcResult并返回,一直返回到HeaderExchangeHandlerreceived(Channel channel, Object message)

// 收到请求消息并处理
// @param channel NettyChannel.
// @param message message. e.g. Request
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 设置读时间戳
    // 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request. 处理请求---提供者端
            Request request = (Request) message;
            if (request.isEvent()) { // 事件请求
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) { // 双向,有返回值
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response); // 发送响应
                } else {
                    handler.received(exchangeChannel, request.getData()); // 调用方法但不回复
                }
            }
        } else if (message instanceof Response) {
            // 处理响应--消费者端
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}
3.发送响应

得到响应结果Response之后,调用channel.send(response);将响应结果返回给客户端,这里的channelNettyChannel,执行NettyChannel.send(Object message)方法,实际上调用的是其父类AbstractPeersend(Object message)方法,AbstractPeer.send(Object message)方法再调用NettyChannel.send(Object message, boolean sent)方法:

// message--Response,sent--是否等待响应发送完毕
public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent); // 检查Channel是否已经关闭

    boolean success = true;
    int timeout = 0;
    try {
        // channel--NioAcceptedSocketChannel
        ChannelFuture future = channel.write(message); // 异步响应
        if (sent) { // sent=true,等待响应发送完成
            // 超时
            timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            success = future.await(timeout); // 等待写操作完成
        }
        Throwable cause = future.getCause(); // 若写操作失败,得到失败异常
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + ", cause: " + e.getMessage() + ", may be graceful shutdown problem (2.5.3 or 3.1.*)"
                + ", see http://git.caimi-inc.com/middleware/hokage/issues/14",
                e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

NettyChannel.send(Object message, boolean sent)方法调用NioAcceptedSocketChannel.write(Object message)将消息写回给客户端。

回给客户端之前,同样会经过NettyHandler的处理。

public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    super.writeRequested(ctx, e); // 将写出响应的事件往后传播,即编码后发送
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    try {
        // handler--NettyServer, message--Response
        handler.sent(channel, e.getMessage());
    } finally {
        NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
}

NettyHandler.writeRequested方法中首先将写出响应的事件往后传播,即用编码器将Response编码后发送给客户端。接着调用NettyServer.sent(NettyChannel, Response)方法,其调用父类AbstractPeersent(Channel ch, Object msg)方法:

// AbstractPeer
public void sent(Channel ch, Object msg) throws RemotingException {
    if (closed) {
        return;
    }
    // MultiMessageHandler
    handler.sent(ch, msg);
}

接着调用MultiMessageHandler.sent(Channel channel, Object message)方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--HeartBeatHandler
    handler.sent(channel, message);
}

接着调用HeartBeatHandler.sent(Channel channel, Object message),实际上调用的也是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--AllChannelHandler
    handler.sent(channel, message);
}

接着调用AllChannelHandler.sent(Channel channel, Object message)方法,实际上调用的是父类WrappedChannelHandler.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--DecodeHandler实例
    handler.sent(channel, message);
}

继续调用DecodeHandler.sent(Channel channel, Object message)方法,实际上调用的是父类AbstractChannelHandlerDelegate.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    // handler--HeaderExchangeHandler
    handler.sent(channel, message);
}

继续调用HeaderExchangeHandler.sent(Channel channel, Object message)方法:

public void sent(Channel channel, Object message) throws RemotingException {
    Throwable exception = null;
    try {
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); // 设置写时间戳
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // NettyChannel绑定一个HeaderExchangeChannel
        try {
            handler.sent(exchangeChannel, message); // do nothing
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    } catch (Throwable t) {
        exception = t;
    }
    if (message instanceof Request) {
        Request request = (Request) message;
        DefaultFuture.sent(channel, request); // 标记发送状态
    }
    if (exception != null) {
        if (exception instanceof RuntimeException) {
            throw (RuntimeException) exception;
        } else if (exception instanceof RemotingException) {
            throw (RemotingException) exception;
        } else {
            throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
                    exception.getMessage(), exception);
        }
    }
}

HeaderExchangeHandler.sent(Channel channel, Object message)方法中,调用了DubboProtocol匿名内部类requestHandlersent(Channel channel, Object message),这里没有什么逻辑。

至此,服务端接受请求、处理请求、发送响应的过程分析完毕。