Dubbo 心跳机制

Heartbeat

Posted by Jay on March 25, 2019

Dubbo 心跳机制

Dubbo的心跳机制:

  • 目的:检测provider与consumer之间的connection连接是否还连接着,如果连接断了,需要作出相应的处理。
  • 原理:
    • provider端的心跳默认是在heartbeat(默认是60s)内如果没有读消息或写消息,就会发送心跳请求消息,如果连着3次(180s)没有读消息,provider会关闭channel
    • consumer端的心跳默认是在60s内如果没有读消息或写消息,就会发送心跳请求消息,如果连着3次(180s)没有读消息,consumer会进行重连

下面分provider与consumer进行心跳机制的分析。

一、provider端的心跳机制

//2.3 开启netty服务端监听客户端请求
-->openServer(URL url)
  -->createServer(URL url)
      -->HeaderExchanger.bind(URL url, ExchangeHandler handler)
        -->new DecodeHandler(new HeaderExchangeHandler(handler)))
          -->NettyTransporter.bind(URL url, ChannelHandler listener)
            -->new NettyServer(URL url, ChannelHandler handler)
              -->ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
                -->new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(handler,url)))
              -->getChannelCodec(url)
              -->doOpen() //开启netty服务
        -->new HeaderExchangeServer(Server server)
          -->startHeatbeatTimer()

服务端在开启Netty服务时, 在调用createServer时,会从url的parameters map中获取heartbeat配置,代码如下:

private ExchangeServer createServer(URL url) {
    //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
    //默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
    // 协议的服务端实现类型
    String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
    // 检查服务端Transporter扩展是否存在
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported server type: " + str + ", url: " + url);
    }

    // 编解码器
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    ExchangeServer server;
    try {
        // 返回HeaderExchangerServer
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }
    str = url.getParameter(Constants.CLIENT_KEY);
    if (str != null && str.length() > 0) {
        Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
        if (!supportedTypes.contains(str)) {
            throw new RpcException("Unsupported client type: " + str);
        }
    }
    return server;
}

其中:int DEFAULT_HEARTBEAT = 60 * 1000,即当用户没有配置heartbeat(心跳间隔时间)时,默认heartbeat=60s(即60s内没有读消息或写消息,就会发送心跳请求信息)。那么这个heartbeat到底该怎么配?

provider端:

<dubbo:service ...>
    <dubbo:parameter key="heartbeat" value="3000"/>
</dubbo:service>

consumer端:

<dubbo:reference ...>
    <dubbo:parameter key="heartbeat" value="3000"/>
</dubbo:reference>

再来看调用链,当执行到这一句:

ChannelHandler.wrapInternal(ChannelHandler handler, URL url)

会形成一个handler调用链,调用链如下:

MultiMessageHandler
-->handler: HeartbeatHandler
   -->handler: AllChannelHandler
         -->url: providerUrl
         -->executor: FixedExecutor
         -->handler: DecodeHandler
            -->handler: HeaderExchangeHandler
               -->handler: ExchangeHandlerAdapterDubboProtocol.requestHandler

这也是Netty接收到请求后的处理链路,注意其中有一个HeartbeatHandler。

最后,执行new HeaderExchangeServer(Server server),来看源码:

public class HeaderExchangeServer implements ExchangeServer {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
		// 定时任务线程池
    private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
            new NamedThreadFactory("dubbo-remoting-server-heartbeat", true));
    /**
     * 远程服务器 NettyServer实例
     */
    private final Server server;
    /**
     * 心跳定时器
     */
    private ScheduledFuture<?> heartbeatTimer;
    // 心跳间隔
    private int heartbeat;
    /**
     * 心跳超时,毫秒。缺省0,不会执行心跳。
     */
    private int heartbeatTimeout;

    private final AtomicBoolean closed = new AtomicBoolean(false);

    // server---NettyServer实例
    public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeartbeatTimer();
    }
  
  	private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        @Override
                        public Collection<Channel> getChannels() {
                            return Collections.unmodifiableCollection(
                                    HeaderExchangeServer.this.getChannels());
                        }
                    }, heartbeat, heartbeatTimeout),
                    heartbeat, heartbeat, TimeUnit.MILLISECONDS);
        }
    }

    private void stopHeartbeatTimer() {
        try {
            ScheduledFuture<?> timer = heartbeatTimer;
            if (timer != null && !timer.isCancelled()) {
                timer.cancel(true);
            }
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        } finally {
            heartbeatTimer = null;
        }
    }
  
}

创建HeaderExchangeServer时,初始化了heartbeat(心跳间隔时间)和heartbeatTimeout(心跳响应超时时间:即如果最终发送的心跳在这个时间内都没有返回,则做出相应的处理)。

  • heartbeat默认是0(从startHeatbeatTimer()方法可以看出只有heartbeat>0的情况下,才会发心跳,这里heartbeat如果从url的parameter map中获取不到,就是0,但是在前边看到Dubbo会默认设置heartbeat=60s到parameter map中,所以此处的heartbeat=60s);
  • heartbeatTimeout:默认是heartbeat*3。(原因:假设一端发出一次heartbeatRequest,另一端在heartbeat内没有返回任何响应——包括正常请求响应和心跳响应,此时不能认为是连接断了,因为有可能是网络抖动等原因导致了丢包)
  • scheduled是一个含有一个线程的定时任务线程池(其中的线程名字为:”dubbo-remoting-server-heartbeat-thread-*”)

之后启动心跳定时任务:

  • 首先如果原来有心跳定时任务,关闭原来的定时任务;
  • 之后启动scheduled中的定时线程,从启动该线程开始,每隔heartbeat执行一次HeartBeatTask任务(第一次执行是在启动线程后heartbeat时)

来看一下HeartBeatTask的源码:

// 心跳任务
final class HeartBeatTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
    // 底层的Channels-->channel获取器,用于获取所有需要进行心跳检测的channel
    private ChannelProvider channelProvider;
    // 心跳间隔,默认60s
    private int heartbeat;
    // 心跳超时时间,默认180s
    private int heartbeatTimeout;

    HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    @Override
    public void run() {
        try {
            long now = System.currentTimeMillis();
          	// channel————HeaderExchangeChannel,包装NettyChannel(提供者)
            for (Channel channel : channelProvider.getChannels()) { 
                if (channel.isClosed()) {
                    continue;
                }
                try {
                  	// 获取上一次读消息的时间
                    Long lastRead = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 读时间戳 "READ_TIMESTAMP"
                  	// 获取上一次写消息的时间
                    Long lastWrite = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 写时间戳 "READ_TIMESTAMP"
                    // channel没有数据读写,因此发送心跳包
                    // 如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true);
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                  	// 正常消息和心跳在heartbeatTimeout都没接收到
                    // 如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,
                    // 发送了三次心跳都没反应),此时客户端channel进行重连
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect(); // 客户端channel,重连服务端
                            } catch (Exception ignored) {
                                // ignore
                            }
                        } else {
                            channel.close(); // 服务端channel,则直接将与客户端的连接channel关闭
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

    interface ChannelProvider {
        Collection<Channel> getChannels();
    }

}

HeartBeatTask首先通过channelProvider.getChannels获取所有需要心跳检测的channel,channelProvider实例是HeaderExchangeServer中启动心跳定时任务的时候创建的内部类。

private void startHeartbeatTimer() {
    stopHeartbeatTimer();
    if (heartbeat > 0) {
        heartbeatTimer = scheduled.scheduleWithFixedDelay(
                new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                    @Override
                    public Collection<Channel> getChannels() {
                        return Collections.unmodifiableCollection(
                                HeaderExchangeServer.this.getChannels());
                    }
                }, heartbeat, heartbeatTimeout),
                heartbeat, heartbeat, TimeUnit.MILLISECONDS);
    }
}

来看一下HeaderExchangeServer.this.getChannels():

public Collection<Channel> getChannels() {
    return (Collection) getExchangeChannels();
}

public Collection<ExchangeChannel> getExchangeChannels() {
    Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
    Collection<Channel> channels = server.getChannels(); // NettyChannels
    if (channels != null && channels.size() > 0) {
        for (Channel channel : channels) {
            exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
        }
    }
    return exchangeChannels;
}

实际上就是获取NettyServer中的全部channel连接。

获取到需要心跳检测的channel后,对每一个channel进行如下判断:

  • 如果在heartbeat内没有进行读操作或者写操作,则发送心跳请求;
  • 如果正常消息和心跳在heartbeatTimeout都没接收到,consumer端会进行重连,provider端会关闭channel。

这里比较关键的是lastRead和lastWrite的设置。先来看一下获取:

Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);

说明有地方在设置这两个值到channel中。

从请求和响应处理来看,无论是请求还是响应都会按照这个顺序处理一遍。

MultiMessageHandler
-->handler: HeartbeatHandler
   -->handler: AllChannelHandler
         -->url: providerUrl
         -->executor: FixedExecutor
         -->handler: DecodeHandler
            -->handler: HeaderExchangeHandler
               -->handler: ExchangeHandlerAdapterDubboProtocol.requestHandler

其中HeartbeatHandler源码如下:

public class HeartbeatHandler extends AbstractChannelHandlerDelegate {

    private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);

    public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";

    public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";

    public HeartbeatHandler(ChannelHandler handler) {
        super(handler);
    }

    public void connected(Channel channel) throws RemotingException {
        setReadTimestamp(channel);
        setWriteTimestamp(channel);
        handler.connected(channel);
    }

    public void disconnected(Channel channel) throws RemotingException {
        clearReadTimestamp(channel);
        clearWriteTimestamp(channel);
        handler.disconnected(channel);
    }

    /**
     *
     * @param channel NettyChannel
     * @param message Request
     * @throws RemotingException
     */
    public void sent(Channel channel, Object message) throws RemotingException {
        setWriteTimestamp(channel);
        // handel--AllChannelHandler
        handler.sent(channel, message);
    }

    /**
     * 收到消息
     * @param channel NettyChannel
     * @param message Request
     * @throws RemotingException
     */
    public void received(Channel channel, Object message) throws RemotingException {
        setReadTimestamp(channel); // 设置读时间戳
        // 心跳request
        if (isHeartbeatRequest(message)) {
            Request req = (Request) message;
            if (req.isTwoWay()) {
                Response res = new Response(req.getId(), req.getVersion()); // 与request invoke id一一对应
                res.setEvent(Response.HEARTBEAT_EVENT); // 心跳事件
                channel.send(res); // 直接发送
                if (logger.isInfoEnabled()) {
                    int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
                                + ", cause: The channel has no data-transmission exceeds a heartbeat period"
                                + (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
                    }
                }
            }
            return;
        }
        // 心跳响应
        if (isHeartbeatResponse(message)) {
            if (logger.isDebugEnabled()) {
                logger.debug(
                        new StringBuilder(32)
                                .append("Receive heartbeat response in thread ")
                                .append(Thread.currentThread().getName())
                                .toString());
            }
            return;
        }
        // handler--AllChannelHandler
        handler.received(channel, message);
    }

    private void setReadTimestamp(Channel channel) {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    }

    // 设置写时间
    private void setWriteTimestamp(Channel channel) {
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    }

    private void clearReadTimestamp(Channel channel) {
        channel.removeAttribute(KEY_READ_TIMESTAMP);
    }

    private void clearWriteTimestamp(Channel channel) {
        channel.removeAttribute(KEY_WRITE_TIMESTAMP);
    }

    private boolean isHeartbeatRequest(Object message) {
        return message instanceof Request && ((Request) message).isHeartbeat();
    }

    private boolean isHeartbeatResponse(Object message) {
        return message instanceof Response && ((Response) message).isHeartbeat();
    }
}
  • 连接完成时:设置lastRead和lastWrite
  • 连接断开时:清空lastRead和lastWrite
  • 发送消息时:设置lastWrite
  • 接收消息时:设置lastRead

之后交由AllChannelHandler进行处理。之后会一直交由HeaderExchangeHandler进行处理。其对lastRead和lastWrite也做了设置:

@Override
public void connected(Channel channel) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        handler.connected(exchangeChannel);
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

@Override
public void disconnected(Channel channel) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        handler.disconnected(exchangeChannel);
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

// 发送请求(consumer)/响应(provider)
// @param channel channel. NettyChannel
// @param message message. Request/Response
@Override
public void sent(Channel channel, Object message) throws RemotingException {
    Throwable exception = null;
    try {
        channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); // 设置写时间戳
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // NettyChannel绑定一个HeaderExchangeChannel
        try {
            handler.sent(exchangeChannel, message); // do nothing
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    } catch (Throwable t) {
        exception = t;
    }
    if (message instanceof Request) {
        Request request = (Request) message;
        DefaultFuture.sent(channel, request); // 标记发送状态
    }
    if (exception != null) {
        if (exception instanceof RuntimeException) {
            throw (RuntimeException) exception;
        } else if (exception instanceof RemotingException) {
            throw (RemotingException) exception;
        } else {
            throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
                    exception.getMessage(), exception);
        }
    }
}

// 收到消息
// @param channel NettyChannel.
// @param message message. e.g. Request/Response
@Override
public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 读时间戳
    // 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request. 处理请求---提供者端
            Request request = (Request) message;
            if (request.isEvent()) { // 事件请求
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) { // 双向,有返回值
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData()); // 调用方法但不回复
                }
            }
        } else if (message instanceof Response) {
            // 处理响应--消费者端
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}
  • 连接完成时:设置lastRead和lastWrite
  • 连接断开时:也设置lastRead和lastWrite(为什么)
  • 发送消息时:设置lastWrite
  • 接收消息时:设置lastRead

这里有个疑问,从handler链来看,无论是请求还是响应都会按照handler链来处理一遍。那么在HeartbeatHandler中已经进行了lastWrite和lastRead的设置,为什么还要在HeaderExchangeHandler中再设置一遍?

最后,provider端认为连接断了,则会关闭channel。来看一下NettyChannel的close方法:

// 关闭连接
public void close() {
    // 1 将close属性设为true
    try {
        super.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    // 2 从全局NettyChannel缓存中将当前的NettyChannel删掉
    try {
        removeChannelIfDisconnected(channel);
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    // 3 清空当前的NettyChannel中的attributes属性
    try {
        attributes.clear();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
    // 4 关闭Netty的channel,执行netty的channel的优雅关闭
    try {
        if (logger.isInfoEnabled()) {
            logger.info("Close netty channel " + channel);
        }
        channel.close();
    } catch (Exception e) {
        logger.warn(e.getMessage(), e);
    }
}

从上边代码来看,假设consumer端挂了,provider端的心跳检测机制可以进行相关的资源回收,所以provider端的心跳检测机制是有必要的。

二、consumer端的心跳机制

//3.1.1 创建ExchangeClient,对第一次服务发现providers路径下的相关url建立长连接
-->getClients(URL url)
  -->getSharedClient(URL url)
    -->ExchangeClient exchangeClient = initClient(url)
      -->Exchangers.connect(url, requestHandler)
        -->HeaderExchanger.connect(URL url, ExchangeHandler handler)
          -->new DecodeHandler(new HeaderExchangeHandler(handler)))
            -->Transporters.connect(URL url, ChannelHandler... handlers)
              -->NettyTransporter.connect(URL url, ChannelHandler listener)
                -->new NettyClient(url, listener)
                  -->new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(handler, url)))
                  -->getChannelCodec(url) // 获取Codec2,这里是DubboCountCodec实例
                  -->doOpen() // 初始化netty客户端
                  -->doConnect() // 连接Netty服务端,建立长连接
          -->new HeaderExchangeClient(Client client, boolean needHeartbeat) // 上述client为NettyClient实例,needHeartbeat为true
            -->startHeatbeatTimer() // 启动心跳检测

客户端在initClient(url)中设置了heartbeat参数(默认为60s,用户自己设置的方式见“一”中所讲),如下:

private ExchangeClient initClient(URL url) {
    // client type setting.---netty
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
    // 协议编解码
    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // 默认开启heartbeat
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO存在严重性能问题,暂时不允许使用
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        //设置连接应该是lazy的 
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            // 建立懒连接client
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // 返回HeaderExchangeClient实例
            client = Exchangers.connect(url, requestHandler);
        }
    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }
    return client;
}

与provider类似,来看一下最后开启心跳检测的地方。

// 心跳任务
final class HeartBeatTask implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
    // 底层的Channels-->channel获取器,用于获取所有需要进行心跳检测的channel
    private ChannelProvider channelProvider;
    // 心跳间隔,默认60s
    private int heartbeat;
    // 心跳超时时间,默认180s
    private int heartbeatTimeout;

    HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
        this.channelProvider = provider;
        this.heartbeat = heartbeat;
        this.heartbeatTimeout = heartbeatTimeout;
    }

    @Override
    public void run() {
        try {
            long now = System.currentTimeMillis();
          	// channel————HeaderExchangeClient
            for (Channel channel : channelProvider.getChannels()) { 
                if (channel.isClosed()) {
                    continue;
                }
                try {
                  	// 获取上一次读消息的时间
                    Long lastRead = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 读时间戳 "READ_TIMESTAMP"
                  	// 获取上一次写消息的时间
                    Long lastWrite = (Long) channel.getAttribute(
                            HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 写时间戳 "READ_TIMESTAMP"
                    // channel没有数据读写,因此发送心跳包
                    // 如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳
                    if ((lastRead != null && now - lastRead > heartbeat)
                            || (lastWrite != null && now - lastWrite > heartbeat)) {
                        Request req = new Request();
                        req.setVersion("2.0.0");
                        req.setTwoWay(true);
                        req.setEvent(Request.HEARTBEAT_EVENT);
                        channel.send(req);
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                                    + ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
                        }
                    }
                  	// 正常消息和心跳在heartbeatTimeout都没接收到
                    // 如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,
                    // 发送了三次心跳都没反应),此时客户端channel进行重连
                    if (lastRead != null && now - lastRead > heartbeatTimeout) {
                        logger.warn("Close channel " + channel
                                + ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
                        if (channel instanceof Client) {
                            try {
                                ((Client) channel).reconnect(); // 客户端channel,重连服务端
                            } catch (Exception ignored) {
                                // ignore
                            }
                        } else {
                            channel.close(); // 服务端channel,则直接将与客户端的连接channel关闭
                        }
                    }
                } catch (Throwable t) {
                    logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
                }
            }
        } catch (Throwable t) {
            logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
        }
    }

    interface ChannelProvider {
        Collection<Channel> getChannels();
    }

}

主要看一下startHeartbeatTimer()方法,与provider相同,只是provider是获取NettyServer的所有的NettyChannel,而consumer只是获取当前的对象。

consumer的handler处理链与provider完全相同。

最后来看一下consumer的重连机制:AbstractClient.reconnect

// 重连服务端
public void reconnect() throws RemotingException {
    // 用"双重检查加锁",减少使用同步
    if (!isConnected()) {
        connectLock.lock();
        try {
            if (!isConnected()) {
                disconnect();
                connect();
            }
        } finally {
            connectLock.unlock();
        }
    }
}

// 断开与服务器端的连接。
public void disconnect() {
    connectLock.lock();
    try {
        // 取消连接状态检测及重连任务
        destroyConnectStatusCheckCommand();
        try {
            Channel channel = getChannel();
            if (channel != null) {
                // 关闭channnel
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            // NettyClient的断开连接动作
            doDisconnect();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    } finally {
        connectLock.unlock();
    }
}

// 连接到服务器端。
protected void connect() throws RemotingException {
    connectLock.lock();
    try {
        if (isConnected()) { // 是否已连接到服务端
            return;
        }
        // 定时检测连接状态,如果连接断开了,则重连
        initConnectStatusCheckCommand();
        // NettyClient连接服务端
        doConnect();
        if (!isConnected()) {
            throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
                    + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
                    + " using dubbo version " + Version.getVersion()
                    + ", cause: Connect wait timeout: " + getTimeout() + "ms.");
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Successful connect to server " + getRemoteAddress()
                        + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
                        + " using dubbo version " + Version.getVersion()
                        + ", channel is " + this.getChannel());
            }
        }
        reconnectCount.set(0);
        reconnectErrorLogFlag.set(false);
    } catch (RemotingException e) {
        throw e;
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
                + " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
                + " using dubbo version " + Version.getVersion()
                + ", cause: " + e.getMessage(), e);
    } finally {
        connectLock.unlock();
    }
}

重连服务端逻辑是先断连,再连接。

对于心跳机制,Netty本身提供了空闲检测:IdleStateHandler,也可以直接基于此实现心跳机制。