Dubbo线程模型

Thread Model

Posted by Jay on March 31, 2019

Dubbo线程模型

一、Netty的线程模型

在Netty中存在两种线程:boss线程和worker线程。

1.boss线程

作用:

  • accept客户端的连接;
  • 将接收到的连接注册到一个worker线程上。

个数:

  • 通常情况下,服务端每绑定一个端口,开启一个boss线程。
2.worker线程

作用:

  • 处理注册在其身上的连接connection上的各种io事件。

个数:

  • 默认是:CPU核数+1。

注意:

  • 一个worker线程可以注册多个connection;
  • 一个connection只能注册在一个worker线程上。

二、Dubbo的事件派发策略和线程池

1.事件派发

Dubbo基于Netty,有5种派发策略:

  • 默认是all 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件,异常等,心跳事件直接在IO线程处理。即worker线程接收到事件后,将该事件提交到业务线程池中,自己再去处理其他事件。

    // 默认的线程池配置
    class AllChannelHandler extends WrappedChannelHandler {
      
        AllChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
      
        /** NettyChannel实例 */
      	// 连接事件
        @Override
        public void connected(Channel channel) throws RemotingException {
            ExecutorService executorService = getExecutorService();
            try {
                executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException("connect event", channel, getClass() + " error when process connected event.", t);
            }
        }
      		
      	// 连接断开事件
        @Override
        public void disconnected(Channel channel) throws RemotingException {
            ExecutorService executorService = getExecutorService();
            try {
                executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event.", t);
            }
        }
      
        // 收到消息Request/Response
        // @param channel NettyChannel
        // @param message Request/Response
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            // 提供者: FixedThreadPool.getExecutor();消费者:CachedThreadPool.getExecutor()
            ExecutorService executorService = getExecutorService();
            try {
                executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //TODO 临时解决线程池满后异常信息无法发送到对端的问题。待重构
                //fix 线程池满了拒绝调用不返回,导致消费者一直等待超时
            	if(message instanceof Request && t instanceof RejectedExecutionException){
            		Request request = (Request)message;
            		if(request.isTwoWay()){
            			String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted, " +
                                "detail msg:" + t.getMessage();
            			Response response = new Response(request.getId(), request.getVersion());
            			response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
            			response.setErrorMessage(msg);
            			channel.send(response);
            			return;
            		}
            	}
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        }
      	
      	// 异常
        @Override
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            ExecutorService executorService = getExecutorService();
            try {
                executorService.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
            } catch (Throwable t) {
                throw new ExecutionException("caught event", channel, getClass() + " error when process caught event.", t);
            }
        }
      
        private ExecutorService getExecutorService() {
            ExecutorService executorService = executor;
            if (executorService == null || executorService.isShutdown()) {
                executorService = SHARED_EXECUTOR;
            }
            return executorService;
        }
      
    }
    
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。即worker线程接收到事件后,由worker执行到底。

    // 不派发线程池。
    public class DirectDispatcher implements Dispatcher {
      
        public static final String NAME = "direct";
      
        @Override
        public ChannelHandler dispatch(ChannelHandler handler, URL url) {
            return handler;
        }
      
    }
    
  • message 只有请求、响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

    // 只派发请求、响应消息到业务线程池
    public class MessageOnlyChannelHandler extends WrappedChannelHandler {
      
        public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
      		
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) {
                cexecutor = SHARED_EXECUTOR;
            }
            try {
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
      
    }
    
  • execution 只请求消息派发到线程池,不含响应,响应和其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

    // 只派发请求消息到业务线程池
    public class ExecutionChannelHandler extends WrappedChannelHandler {
      
        public ExecutionChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
        }
      
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getExecutorService();
            if (message instanceof Request) {
                try {
                    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
                } catch (Throwable t) {
                    // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                    // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                    // this scenario from happening, but a better solution should be considered later.
                    if (t instanceof RejectedExecutionException) {
                        Request request = (Request) message;
                        if (request.isTwoWay()) {
                            String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                    + ") thread pool is exhausted, detail msg:" + t.getMessage();
                            Response response = new Response(request.getId(), request.getVersion());
                            response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                            response.setErrorMessage(msg);
                            channel.send(response);
                            return;
                        }
                    }
                    throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
                }
            } else {
                handler.received(channel, message);
            }
        }
    }
    
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

    // connect disconnect 保证顺序
    public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {
      
        protected final ThreadPoolExecutor connectionExecutor;
        private final int queuewarninglimit;
      
        public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
            super(handler, url);
            String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            connectionExecutor = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                    new NamedThreadFactory(threadName, true),
                    new AbortPolicyWithReport(threadName, url)
            );  // FIXME There's no place to release connectionExecutor!
            queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
        }
      
        @Override
        public void connected(Channel channel) throws RemotingException {
            try {
                checkQueueLength();
                connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
            }
        }
      
        @Override
        public void disconnected(Channel channel) throws RemotingException {
            try {
                checkQueueLength();
                connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
            } catch (Throwable t) {
                throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
            }
        }
      
        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getExecutorService();
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
                if (message instanceof Request && t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
      
        @Override
        public void caught(Channel channel, Throwable exception) throws RemotingException {
            ExecutorService executor = getExecutorService();
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
            } catch (Throwable t) {
                throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
            }
        }
      
        private void checkQueueLength() {
            if (connectionExecutor.getQueue().size() > queuewarninglimit) {
                logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
            }
        }
    }
    
2.业务线程池
  • fixed:固定大小线程池,接收连接、请求等事件时建立线程,不关闭,一直持有。(缺省)

    • coresize:200
    • maxsize:200
    • 队列:SynchronousQueue
    • 回绝策略:AbortPolicyWithReport—dump线程堆栈jstack,之后抛出异常
    public class FixedThreadPool implements ThreadPool {
      
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
      
    }
    
  • cached:缓存线程池,线程空闲一分钟自动删除,需要时重建。

    public class CachedThreadPool implements ThreadPool {
      
        @Override
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
      
    }
    
  • limited:总线程个数一定的线程池,池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。(与fixed类似)

    public class LimitedThreadPool implements ThreadPool {
      
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                    queues == 0 ? new SynchronousQueue<Runnable>() :
                            (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                    : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
      
    }
    

三、Dubbo服务端

两种线程池:

  • io线程池:Netty的boss和worker线程池。
    • boss:接收、建立connection
    • worker:处理注册在其身上的连接connection上的各种io事件
  • 业务线程池:fixedThreadPool:“DubboServerHandler-172.16.113.42:20881”
    • 与worker配合处理各种IO事件,包含请求

四、Dubbo客户端

两种线程池:

  • io线程池:Netty的boss和worker线程池(Netty3 ??)
    • 同上
  • 业务线程池:cachedThreadPool:“DubboClientHandler-172.16.113.42:20881”
    • 与worker线程配合处理各种IO事件,包含响应,最后得到响应后唤醒被阻塞的主线程

五、Dubbo线程模型图

Dubbo线程模型(官方说明):

整体步骤:(事件派发策略以默认的all为例, 以Netty4为例)

  1. 客户端的主线程发出一个请求后获得future,在执行get时进行阻塞等待;
  2. 服务端使用worker线程(Netty通信模型)接收到请求后,将请求提交到server业务线程池中进行处理;
  3. server业务线程处理完成之后,通过服务端worker线程池将响应结果返回给客户端的worker线程池(Netty通信模型),最后worker线程将响应结果提交到client业务线程池进行处理;
  4. client线程将响应结果填充到future中,然后唤醒等待的主线程,主线程获取结果,返回给客户端。

六、关于Dubbo服务端、客户端线程池理解的重要补充

Dubbo社区有如下Issues:

Dubbo消费者与提供者的线程模型存在以下细节:

1.Dubbo消费者端线程模型

源码NettyClient、AllChannelHandler、WrappedChannelHandler

1共享连接(connections=0)场景
			简版数据结构<host:port(提供者地址), CachedThreadPool(corePoolSize)>
			线程池数量sum(host:port)
			线程池数量示例假设提供者节点有6个则消费者应用会创建6个CachedThreadPool实例
2每服务每连接(connections>0)场景
			简版数据结构<host:port(提供者地址), <ExchangeClient, CachedThreadPool(corePoolSize)>(connections)>
			线程池数量sum(host:port * connections)
			线程池数量示例假设提供者节点有6个且某个服务配置了connections=50则消费者应用对于这个服务会创建6*50=300个CachedThreadPool实例(设计不合理)		

从以上消费者线程数目过多的问题以及消费者线程模型可得,如果在某个场景下,消费者请求并发量极大,同时提供者处理缓慢,导致消费者获取响应结果超时,同时在超时之后,提供者返回消息给消费者,因此消费者接收到大量的超时响应消息,这些消息的处理都是在消费者对应的多个CacheThreadPool中处理的。因此会创建大量的线程,处理超时响应消息后处于空闲状态(默认一分钟后才会销毁),因此消费者端的线程瞬时增长是很惊人的,必须对消费者线程池配置进行限制。Dubbo官方提供了消费者端的线程池配置Dubbo-2013 #2114

2.Dubbo提供者线程模型

源码NettyServer、AllChannelHandler、WrappedChannelHandler

一个服务提供者(对应zk中的一条provider url记录)使用一个线程池实例FixedThreadPool