HTTP 1.1 规定了默认保持长连接(HTTP persistent connection),数据传输完成时保持TCP连接不断开(不发RST包、不四次握手),等待在同域名下(IP:port)继续用这个通道传输数据,相反的就是短连接。HTTP的 Keep-alive要让一个 TCP连接为持久连接。
心跳检测
用来检测一个系统是否存活或者网络链路是否通畅的一种方式,其一般做法是定时向被检测系统发送心跳包,被检测系统收到心跳包进行回复,收到回复说明对方存活;心跳能够给长连接提供保活功能,能够检测长连接是否正常;
疑问
TCP连接到底是什么?
为了实现数据的可靠传输,由通信双方经过三次握手交互建立的逻辑连接,通信双方都需要维护这样的连接状态信息。比如netstat中连接状态为ESTABLISHED,表示当前处于连接状态。(这里需要注意的是这个ESTABLISHED的连接状态只是操作系统认为当前还处在连接状态)
是不是建立了长连接,就可以高枕无忧了呢?
建立好长连接,两端的操作系统都维护了连接已经建立的状态,是不是这时向对端发送数据一定能到达呢? 答案是否定的。 可能此时链路已经不通,只是TCP层还没有感知到这一信息,操作系统层面显示的状态依然是连接状态,而且因为TCP层还认为连接是ESTABLISHED,所以作为应用层自然也就无法感知当前的链路不通。
这种情况会导致什么问题?
如果此时有数据想要传输,显然,数据是无法传送到对端,但是TCP协议为了保证可靠性,会重传请求,如果问题只是网线接头松了,导致网络不通,此时如果及时将网线接头接好,数据还是能正常到达对端,且TCP的连接依然是ESTABLISHED,不会有任何变化。但不是任何时候都这么巧,有时就是某段链路瘫痪了,或者主机挂了,系统异常关闭了等。这时候如果应用系统不能感知到,是件很危险的事情。
长连接怎么保活?
TCP协议实现中,是有保活机制的,也就是 TCP的KeepAlive机制(此机制并不是TCP协议规范中的内容,由操作系统去实现),KeepAlive机制开启后,在一定时间内(一般时间为7200s,参数tcp_keepalive_time)在链路上没有数据传送的情况下,TCP层将发送相应的KeepAlive探针以确定连接可用性,探测失败后重试10(参数tcp_keepalive_probes)次,每次间隔时间75s(参数tcp_keepalive_intvl),所有探测失败后,才认为当前连接已经不可用。这些参数是机器级别,可以调整。
为什么需要应用层心跳机制?
按照 TCP的KeepAlive机制,默认的参数,显然不能满足要求。那是不是调小点就可以了呢? 调整参数,当然是有用的,但是首先参数的机器级别的,调整起来不太方便,更换机器还得记得调整参数,对系统的使用方来说,未免增加了维护成本,而且很可能忘记;
其次由于KeepAlive的保活机制只在链路空闲的情况下才会起到作用,假如此时有数据发送,且物理链路已经不通,操作系统这边的链路状态还是ESTABLISHED,这时会发生什么?自然会走TCP重传机制,要知道默认的TCP超时重传,指数退避算法也是一个相当长的过程。因此,一个可靠的系统,长连接的保活肯定是要依赖应用层的心跳来保证的。
这里应用层的心跳举个例子,比如客户端每隔3s通过长连接通道发送一个心跳请求到服务端,连续失败5次就断开连接。这样算下来最长15s就能发现连接已经不可用,一旦连接不可用,可以重连,也可以做其他的failover处理,比如请求其他服务器。
应用层心跳还有个好处,比如某台服务器因为某些原因导致负载超高,CPU飙高,或者线程池打满等等,无法响应任何业务请求,如果使用TCP自身的机制无法发现任何问题,然而对客户端而言,这时的最好选择就是断连后重新连接其他服务器,而不是一直认为当前服务器是可用状态,向当前服务器发送一些必然会失败的请求。
Netty 心跳实现 IdleStateHandler
- 服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
1
2
3
4
5
6
7
8
9
10
11
12
ServerBootstrap b= new ServerBootstrap();
b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigge()方法
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new StringDecoder());
socketChannel.pipeline().addLast(new HeartBeatServerHandler());
}
});
自定义处理类Handler继承ChannlInboundHandlerAdapter,实现其userEventTriggered()方法,在出现超时事件时会被触发,包括读空闲超时或者写空闲超时
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
private int lossConnectCount = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("已经5秒未收到客户端的消息了!");
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.READER_IDLE){
lossConnectCount++;
if (lossConnectCount>2){
System.out.println("关闭这个不活跃通道!");
ctx.channel().close();
}
}
}else {
super.userEventTriggered(ctx,evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
lossConnectCount = 0;
System.out.println("client says: " + msg.toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
- 客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
1
2
3
4
5
6
7
8
9
10
11
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;
socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
socketChannel.pipeline().addLast(new StringEncoder());
socketChannel.pipeline().addLast(new HeartBeatClientHandler());
}
});
自定义处理类Handler继承ChannlInboundHandlerAdapter,实现自定义userEventTrigger()方法,如果出现超时时间就会被触发,包括读空闲超时或者写空闲超时;
1
2
3
4
5
6
7
8
9
10
11
12
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
System.out.println("客户端循环心跳监测发送: "+new Date());
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.WRITER_IDLE){
if (curTime<beatTime){
curTime++;
ctx.writeAndFlush("biubiu");
}
}
}
}
IdleStateHandler 源码解析
IdleStateHandler构造器:
1
2
3
4
5
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
-
readerIdleTime读空闲超时时间设定,如果channelRead()方法超过readerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;
-
writerIdleTime写空闲超时时间设定,如果write()方法超过writerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;
-
allIdleTime所有类型的空闲超时时间设定,包括读空闲和写空闲;
-
unit时间单位,包括时分秒等
心跳检测也是一种Handler,在启动时添加到ChannelPipeline管道中,当有读写操作时消息在其中传递:
1
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
IdleStateHandler的channelActive()方法在socket通道建立时被触发:
1
2
3
4
5
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
initialize(ctx);
super.channelActive(ctx);
}
channelActive()方法调用Initialize()方法,根据配置的readerIdleTime,WriteIdleTIme等超时事件参数往任务队列taskQueue中添加定时任务task ;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
在管道中传递调用自定义的userEventTrigger()方法:
1
2
3
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
总结
-
IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;
-
Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内有收到客户端的心跳数据包则认为客户端已经下线,将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,服务端都能感知到,而客户端不能感知到服务端的非正常下线;
-
要想实现客户端感知服务端的存活情况,需要进行双向的心跳;Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;