https://cloud.tencent.com/developer/article/1953996?areaSource=104001.94&traceId=7WZNP412yK3vh7ebw4th0
可承遇到,不知什么原因,一个夜晚,机房中,大片的远程调用连接断开。
第二天早上,用户访问高峰,大部分服务器都在获取连接,造成大片网络阻塞。
服务崩溃,惨不忍睹的景象。
本文将从长连接和短连接的概念切入,再到长连接与短连接的区别,以及应用场景,引出心跳机制和断线重连,给出代码实现。
从原理到实践杜绝此类现象。
概念
client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。
这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。上述可知,短连接一般只会在 client/server间传递一次请求操作。
短连接的优缺点
管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。
使用场景
通常浏览器访问服务器的时候就是短连接。
对于服务端来说,长连接会耗费服务端的资源,而且用户用浏览器访问服务端相对而言不是很频繁的
如果有几十万,上百万的连接,服务端的压力会非常大,甚至会崩溃。
所以对于并发量大,请求频率低的,建议使用短连接。
什么是长连接
client向server发起连接,server接受client连接,双方建立连接。
Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。
长连接的生命周期
正常情况下,一条TCP长连接建立后,只要双不提出关闭请求并且不出现异常情况,这条连接是一直存在的.
操作系统不会自动去关闭它,甚至经过物理网络拓扑的改变之后仍然可以使用。
所以一条连接保持几天、几个月、几年或者更长时间都有可能,只要不出现异常情况或由用户(应用层)主动关闭。
客户端和服务单可一直使用该连接进行数据通信。
长连接的优点
长连接可以省去较多的TCP建立和关闭的操作,减少网络阻塞的影响,
当发生错误时,可以在不关闭连接的情况下进行提示,
减少CPU及内存的使用,因为不需要经常的建立及关闭连接。
长连接的缺点
连接数过多时,影响服务端的性能和并发数量。
使用场景
数据库的连接就是采用TCP长连接.
RPC,远程服务调用,在服务器,一个服务进程频繁调用另一个服务进程,可使用长连接,减少连接花费的时间。
总结
1.对于长连接和短连接的使用是需要根据应用场景来判断的
2.长连接并不是万能的,也是需要维护的,
应用层协议大多都有HeartBeat机制,通常是客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。
并传输一些可能必要的数据。使用心跳包的典型协议是IM,比如QQ/MSN/飞信等协议。
在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项:SO_KEEPALIVE。
系统默认是设置的2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。
而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。
因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等,
会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的.
心跳机制即可解决此类问题。
默认KeepAlive状态是不打开的。
需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,
并且可以设置三个参数:
tcp_keepalive_time ,tcp_keepalive_probes , tcp_keepalive_intvl,
分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。
很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。
典型做法是LRU,把最久没有数据的连接给T掉。
通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。
两种方式实现心跳机制:
虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:
使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,
本文的主要介绍应用层方面实现心跳机制,使用netty实现心跳和断线重连。
netty对心跳机制提供了机制,实现的关键是IdleStateHandler先来看一下他的构造函数
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
实例化一个 IdleStateHandler 需要提供三个参数:
1. 客户端成功连接服务端。
2.在客户端中的ChannelPipeline中加入IdleStateHandler,设置写事件触发事件为5s.
3.客户端超过5s未写数据,触发写事件,向服务端发送心跳包,
4.同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,减轻服务端的压力
5.超过三次,1过0s服务端都会收到来自客户端的心跳信息,服务端可以认为客户端挂了,可以close链路。
6.客户端恢复正常,发现链路已断,重新连接服务端。
服务端handler:
package com.heartbreak.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.util.Random;
/**
* @author janti
* @date 2018/6/10 12:21
*/
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
// 失败计数器:未收到client端发送的ping请求
private int unRecPingTimes = 0;
// 定义服务端没有收到心跳消息的最大次数
private static final int MAX_UN_REC_PING_TIMES = 3;
private Random random = new Random(System.currentTimeMillis());
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
if (msg!=null && msg.equals("Heartbeat")){
System.out.println("客户端"+ctx.channel().remoteAddress()+"--心跳信息--");
}else {
System.out.println("客户端----请求消息----:"+msg);
String resp = "商品的价格是:"+random.nextInt(1000);
ctx.writeAndFlush(resp);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state()==IdleState.READER_IDLE){
System.out.println("===服务端===(READER_IDLE 读超时)");
// 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
System.out.println("===服务端===(读超时,关闭chanel)");
// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
ctx.close();
} else {
// 失败计数器加1
unRecPingTimes++;
}
}else {
super.userEventTriggered(ctx,evt);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
System.out.println("一个客户端已连接");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
System.out.println("一个客户端已断开连接");
}
}
服务端server:
package com.heartbreak.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* @author tangj
* @date 2018/6/10 10:46
*/
public class HeartBeatServer {
private static int port = 9817;
public HeartBeatServer(int port) {
this.port = port;
}
ServerBootstrap bootstrap = null;
ChannelFuture f;
// 检测chanel是否接受过心跳数据时间间隔(单位秒)
private static final int READ_WAIT_SECONDS = 10;
public static void main(String args[]) {
HeartBeatServer heartBeatServer = new HeartBeatServer(port);
heartBeatServer.startServer();
}
public void startServer() {
EventLoopGroup bossgroup = new NioEventLoopGroup();
EventLoopGroup workergroup = new NioEventLoopGroup();
try {
bootstrap = new ServerBootstrap();
bootstrap.group(bossgroup, workergroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HeartBeatServerInitializer());
// 服务器绑定端口监听
f = bootstrap.bind(port).sync();
System.out.println("server start ,port: "+port);
// 监听服务器关闭监听,此方法会阻塞
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossgroup.shutdownGracefully();
workergroup.shutdownGracefully();
}
}
private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 监听读操作,读超时时间为5秒,超过5秒关闭channel;
pipeline.addLast("ping",