Dubbo 心跳机制
Dubbo的心跳机制:
- 目的:检测provider与consumer之间的connection连接是否还连接着,如果连接断了,需要作出相应的处理。
- 原理:
- provider端的心跳默认是在heartbeat(默认是60s)内如果没有读消息或写消息,就会发送心跳请求消息,如果连着3次(180s)没有读消息,provider会关闭channel。
- consumer端的心跳默认是在60s内如果没有读消息或写消息,就会发送心跳请求消息,如果连着3次(180s)没有读消息,consumer会进行重连。
下面分provider与consumer进行心跳机制的分析。
一、provider端的心跳机制
//2.3 开启netty服务端监听客户端请求
-->openServer(URL url)
-->createServer(URL url)
-->HeaderExchanger.bind(URL url, ExchangeHandler handler)
-->new DecodeHandler(new HeaderExchangeHandler(handler)))
-->NettyTransporter.bind(URL url, ChannelHandler listener)
-->new NettyServer(URL url, ChannelHandler handler)
-->ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
-->new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(handler,url)))
-->getChannelCodec(url)
-->doOpen() //开启netty服务
-->new HeaderExchangeServer(Server server)
-->startHeatbeatTimer()
服务端在开启Netty服务时, 在调用createServer时,会从url的parameters map中获取heartbeat配置,代码如下:
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 协议的服务端实现类型
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 检查服务端Transporter扩展是否存在
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 编解码器
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 返回HeaderExchangerServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
其中:int DEFAULT_HEARTBEAT = 60 * 1000,即当用户没有配置heartbeat(心跳间隔时间)时,默认heartbeat=60s(即60s内没有读消息或写消息,就会发送心跳请求信息)。那么这个heartbeat到底该怎么配?
provider端:
<dubbo:service ...>
<dubbo:parameter key="heartbeat" value="3000"/>
</dubbo:service>
consumer端:
<dubbo:reference ...>
<dubbo:parameter key="heartbeat" value="3000"/>
</dubbo:reference>
再来看调用链,当执行到这一句:
ChannelHandler.wrapInternal(ChannelHandler handler, URL url)
会形成一个handler调用链,调用链如下:
MultiMessageHandler
-->handler: HeartbeatHandler
-->handler: AllChannelHandler
-->url: providerUrl
-->executor: FixedExecutor
-->handler: DecodeHandler
-->handler: HeaderExchangeHandler
-->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)
这也是Netty接收到请求后的处理链路,注意其中有一个HeartbeatHandler。
最后,执行new HeaderExchangeServer(Server server),来看源码:
public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());
// 定时任务线程池
private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory("dubbo-remoting-server-heartbeat", true));
/**
* 远程服务器 NettyServer实例
*/
private final Server server;
/**
* 心跳定时器
*/
private ScheduledFuture<?> heartbeatTimer;
// 心跳间隔
private int heartbeat;
/**
* 心跳超时,毫秒。缺省0,不会执行心跳。
*/
private int heartbeatTimeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
// server---NettyServer实例
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeartbeatTimer();
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
private void stopHeartbeatTimer() {
try {
ScheduledFuture<?> timer = heartbeatTimer;
if (timer != null && !timer.isCancelled()) {
timer.cancel(true);
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
} finally {
heartbeatTimer = null;
}
}
}
创建HeaderExchangeServer时,初始化了heartbeat(心跳间隔时间)和heartbeatTimeout(心跳响应超时时间:即如果最终发送的心跳在这个时间内都没有返回,则做出相应的处理)。
- heartbeat默认是0(从startHeatbeatTimer()方法可以看出只有heartbeat>0的情况下,才会发心跳,这里heartbeat如果从url的parameter map中获取不到,就是0,但是在前边看到Dubbo会默认设置heartbeat=60s到parameter map中,所以此处的heartbeat=60s);
- heartbeatTimeout:默认是heartbeat*3。(原因:假设一端发出一次heartbeatRequest,另一端在heartbeat内没有返回任何响应——包括正常请求响应和心跳响应,此时不能认为是连接断了,因为有可能是网络抖动等原因导致了丢包)
- scheduled是一个含有一个线程的定时任务线程池(其中的线程名字为:”dubbo-remoting-server-heartbeat-thread-*”)
之后启动心跳定时任务:
- 首先如果原来有心跳定时任务,关闭原来的定时任务;
- 之后启动scheduled中的定时线程,从启动该线程开始,每隔heartbeat执行一次HeartBeatTask任务(第一次执行是在启动线程后heartbeat时)
来看一下HeartBeatTask的源码:
// 心跳任务
final class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
// 底层的Channels-->channel获取器,用于获取所有需要进行心跳检测的channel
private ChannelProvider channelProvider;
// 心跳间隔,默认60s
private int heartbeat;
// 心跳超时时间,默认180s
private int heartbeatTimeout;
HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
this.channelProvider = provider;
this.heartbeat = heartbeat;
this.heartbeatTimeout = heartbeatTimeout;
}
@Override
public void run() {
try {
long now = System.currentTimeMillis();
// channel————HeaderExchangeChannel,包装NettyChannel(提供者)
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
// 获取上一次读消息的时间
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 读时间戳 "READ_TIMESTAMP"
// 获取上一次写消息的时间
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 写时间戳 "READ_TIMESTAMP"
// channel没有数据读写,因此发送心跳包
// 如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 正常消息和心跳在heartbeatTimeout都没接收到
// 如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,
// 发送了三次心跳都没反应),此时客户端channel进行重连
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect(); // 客户端channel,重连服务端
} catch (Exception ignored) {
// ignore
}
} else {
channel.close(); // 服务端channel,则直接将与客户端的连接channel关闭
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
interface ChannelProvider {
Collection<Channel> getChannels();
}
}
HeartBeatTask首先通过channelProvider.getChannels获取所有需要心跳检测的channel,channelProvider实例是HeaderExchangeServer中启动心跳定时任务的时候创建的内部类。
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
来看一下HeaderExchangeServer.this.getChannels():
public Collection<Channel> getChannels() {
return (Collection) getExchangeChannels();
}
public Collection<ExchangeChannel> getExchangeChannels() {
Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();
Collection<Channel> channels = server.getChannels(); // NettyChannels
if (channels != null && channels.size() > 0) {
for (Channel channel : channels) {
exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));
}
}
return exchangeChannels;
}
实际上就是获取NettyServer中的全部channel连接。
获取到需要心跳检测的channel后,对每一个channel进行如下判断:
- 如果在heartbeat内没有进行读操作或者写操作,则发送心跳请求;
- 如果正常消息和心跳在heartbeatTimeout都没接收到,consumer端会进行重连,provider端会关闭channel。
这里比较关键的是lastRead和lastWrite的设置。先来看一下获取:
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
说明有地方在设置这两个值到channel中。
从请求和响应处理来看,无论是请求还是响应都会按照这个顺序处理一遍。
MultiMessageHandler
-->handler: HeartbeatHandler
-->handler: AllChannelHandler
-->url: providerUrl
-->executor: FixedExecutor
-->handler: DecodeHandler
-->handler: HeaderExchangeHandler
-->handler: ExchangeHandlerAdapter(DubboProtocol.requestHandler)
其中HeartbeatHandler源码如下:
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
public HeartbeatHandler(ChannelHandler handler) {
super(handler);
}
public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
}
public void disconnected(Channel channel) throws RemotingException {
clearReadTimestamp(channel);
clearWriteTimestamp(channel);
handler.disconnected(channel);
}
/**
*
* @param channel NettyChannel
* @param message Request
* @throws RemotingException
*/
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
// handel--AllChannelHandler
handler.sent(channel, message);
}
/**
* 收到消息
* @param channel NettyChannel
* @param message Request
* @throws RemotingException
*/
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel); // 设置读时间戳
// 心跳request
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion()); // 与request invoke id一一对应
res.setEvent(Response.HEARTBEAT_EVENT); // 心跳事件
channel.send(res); // 直接发送
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
// 心跳响应
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug(
new StringBuilder(32)
.append("Receive heartbeat response in thread ")
.append(Thread.currentThread().getName())
.toString());
}
return;
}
// handler--AllChannelHandler
handler.received(channel, message);
}
private void setReadTimestamp(Channel channel) {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
}
// 设置写时间
private void setWriteTimestamp(Channel channel) {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
}
private void clearReadTimestamp(Channel channel) {
channel.removeAttribute(KEY_READ_TIMESTAMP);
}
private void clearWriteTimestamp(Channel channel) {
channel.removeAttribute(KEY_WRITE_TIMESTAMP);
}
private boolean isHeartbeatRequest(Object message) {
return message instanceof Request && ((Request) message).isHeartbeat();
}
private boolean isHeartbeatResponse(Object message) {
return message instanceof Response && ((Response) message).isHeartbeat();
}
}
- 连接完成时:设置lastRead和lastWrite
- 连接断开时:清空lastRead和lastWrite
- 发送消息时:设置lastWrite
- 接收消息时:设置lastRead
之后交由AllChannelHandler进行处理。之后会一直交由HeaderExchangeHandler进行处理。其对lastRead和lastWrite也做了设置:
@Override
public void connected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.connected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
@Override
public void disconnected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.disconnected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
// 发送请求(consumer)/响应(provider)
// @param channel channel. NettyChannel
// @param message message. Request/Response
@Override
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); // 设置写时间戳
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); // NettyChannel绑定一个HeaderExchangeChannel
try {
handler.sent(exchangeChannel, message); // do nothing
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request); // 标记发送状态
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
// 收到消息
// @param channel NettyChannel.
// @param message message. e.g. Request/Response
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); // 读时间戳
// 根据NettyChannel,获取HeaderExchangeChannel。NettyChannel绑定一个HeaderExchangeChannel
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request. 处理请求---提供者端
Request request = (Request) message;
if (request.isEvent()) { // 事件请求
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) { // 双向,有返回值
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData()); // 调用方法但不回复
}
}
} else if (message instanceof Response) {
// 处理响应--消费者端
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
- 连接完成时:设置lastRead和lastWrite
- 连接断开时:也设置lastRead和lastWrite(为什么)
- 发送消息时:设置lastWrite
- 接收消息时:设置lastRead
这里有个疑问,从handler链来看,无论是请求还是响应都会按照handler链来处理一遍。那么在HeartbeatHandler中已经进行了lastWrite和lastRead的设置,为什么还要在HeaderExchangeHandler中再设置一遍?
最后,provider端认为连接断了,则会关闭channel。来看一下NettyChannel的close方法:
// 关闭连接
public void close() {
// 1 将close属性设为true
try {
super.close();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
// 2 从全局NettyChannel缓存中将当前的NettyChannel删掉
try {
removeChannelIfDisconnected(channel);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
// 3 清空当前的NettyChannel中的attributes属性
try {
attributes.clear();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
// 4 关闭Netty的channel,执行netty的channel的优雅关闭
try {
if (logger.isInfoEnabled()) {
logger.info("Close netty channel " + channel);
}
channel.close();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
从上边代码来看,假设consumer端挂了,provider端的心跳检测机制可以进行相关的资源回收,所以provider端的心跳检测机制是有必要的。
二、consumer端的心跳机制
//3.1.1 创建ExchangeClient,对第一次服务发现providers路径下的相关url建立长连接
-->getClients(URL url)
-->getSharedClient(URL url)
-->ExchangeClient exchangeClient = initClient(url)
-->Exchangers.connect(url, requestHandler)
-->HeaderExchanger.connect(URL url, ExchangeHandler handler)
-->new DecodeHandler(new HeaderExchangeHandler(handler)))
-->Transporters.connect(URL url, ChannelHandler... handlers)
-->NettyTransporter.connect(URL url, ChannelHandler listener)
-->new NettyClient(url, listener)
-->new MultiMessageHandler(new HeartbeatHandler(new AllChannelHandler(handler, url)))
-->getChannelCodec(url) // 获取Codec2,这里是DubboCountCodec实例
-->doOpen() // 初始化netty客户端
-->doConnect() // 连接Netty服务端,建立长连接
-->new HeaderExchangeClient(Client client, boolean needHeartbeat) // 上述client为NettyClient实例,needHeartbeat为true
-->startHeatbeatTimer() // 启动心跳检测
客户端在initClient(url)中设置了heartbeat参数(默认为60s,用户自己设置的方式见“一”中所讲),如下:
private ExchangeClient initClient(URL url) {
// client type setting.---netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 协议编解码
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
//设置连接应该是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
// 建立懒连接client
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 返回HeaderExchangeClient实例
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
与provider类似,来看一下最后开启心跳检测的地方。
// 心跳任务
final class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
// 底层的Channels-->channel获取器,用于获取所有需要进行心跳检测的channel
private ChannelProvider channelProvider;
// 心跳间隔,默认60s
private int heartbeat;
// 心跳超时时间,默认180s
private int heartbeatTimeout;
HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
this.channelProvider = provider;
this.heartbeat = heartbeat;
this.heartbeatTimeout = heartbeatTimeout;
}
@Override
public void run() {
try {
long now = System.currentTimeMillis();
// channel————HeaderExchangeClient
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
// 获取上一次读消息的时间
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 读时间戳 "READ_TIMESTAMP"
// 获取上一次写消息的时间
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 写时间戳 "READ_TIMESTAMP"
// channel没有数据读写,因此发送心跳包
// 如果最后一次读和写在heartbeat时间(60s)内,则最后一次的读和写本身可以看作心跳;否则,需要程序发送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 正常消息和心跳在heartbeatTimeout都没接收到
// 如果最后一次读的时间距离现在已经超过heartbeatTimeout了,我们认为channel已经断了(因为在这个过程中,
// 发送了三次心跳都没反应),此时客户端channel进行重连
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect(); // 客户端channel,重连服务端
} catch (Exception ignored) {
// ignore
}
} else {
channel.close(); // 服务端channel,则直接将与客户端的连接channel关闭
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
interface ChannelProvider {
Collection<Channel> getChannels();
}
}
主要看一下startHeartbeatTimer()方法,与provider相同,只是provider是获取NettyServer的所有的NettyChannel,而consumer只是获取当前的对象。
consumer的handler处理链与provider完全相同。
最后来看一下consumer的重连机制:AbstractClient.reconnect
// 重连服务端
public void reconnect() throws RemotingException {
// 用"双重检查加锁",减少使用同步
if (!isConnected()) {
connectLock.lock();
try {
if (!isConnected()) {
disconnect();
connect();
}
} finally {
connectLock.unlock();
}
}
}
// 断开与服务器端的连接。
public void disconnect() {
connectLock.lock();
try {
// 取消连接状态检测及重连任务
destroyConnectStatusCheckCommand();
try {
Channel channel = getChannel();
if (channel != null) {
// 关闭channnel
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
// NettyClient的断开连接动作
doDisconnect();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
} finally {
connectLock.unlock();
}
}
// 连接到服务器端。
protected void connect() throws RemotingException {
connectLock.lock();
try {
if (isConnected()) { // 是否已连接到服务端
return;
}
// 定时检测连接状态,如果连接断开了,则重连
initConnectStatusCheckCommand();
// NettyClient连接服务端
doConnect();
if (!isConnected()) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
+ " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
+ " using dubbo version " + Version.getVersion()
+ ", cause: Connect wait timeout: " + getTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successful connect to server " + getRemoteAddress()
+ " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
+ " using dubbo version " + Version.getVersion()
+ ", channel is " + this.getChannel());
}
}
reconnectCount.set(0);
reconnectErrorLogFlag.set(false);
} catch (RemotingException e) {
throw e;
} catch (Throwable e) {
throw new RemotingException(this, "Failed connect to server " + getRemoteAddress()
+ " from " + getClass().getSimpleName() + " " + NetUtils.getLocalHost()
+ " using dubbo version " + Version.getVersion()
+ ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
}
}
重连服务端逻辑是先断连,再连接。
对于心跳机制,Netty本身提供了空闲检测:IdleStateHandler,也可以直接基于此实现心跳机制。