简单地说,ZooKeeper的连接与会话就是客户端通过实例化ZooKeeper对象来实现客户端与服务器创建并保持TCP连接的过程。本质上,Session就是一个TCP 长连接。
Session会话的作用:
Session是ZooKeeper中的会话实体,代表了一个客户端会话。其包含以下4个基本属性。
当客户端和服务端之间的网络连接断开时,ZooKeeper客户端会自动进行反复的重连,直到最终成功连接上ZooKeeper集群中的一台机器。在这种情况下,再次连接上服务端的客户端有可能会处于以下两种状态之一。
当客户端与服务端之间的连接断开后,用户在客户端可能主要会看到两类异常:CONNECTION_LOSS(连接断开)和SESSION_EXPIRED(会话过期)。
连接断开:CONNECTION_LOSS
有时会因为网络闪断导致客户端与服务器断开连接,或是因为客户端当前连接的服务器出现问题导致连接断开,我们统称这类问题为“客户端与服务器连接断开”现象,即CONNECTION_LOSS。在这种情况下,ZooKeeper客户端会自动从地址列表中重新逐个选取新的地址并尝试进行重新连接,直到最终成功连接上服务器。
会话失效:SESSION_EXPIRED
SESSION_EXPIRED是指会话过期,通常发生在CONNECTION_LOSS期间。客户端和服务器连接断开之后,由于重连期间耗时过长,超过了会话超时时间(sessionTimeout)限制后还没有成功连接上服务器,那么服务器认为这个会话已经结束了,就会开始进行会话清理。但是另一方面,该客户端本身不知道会话已经失效,并且其客户端状态还是DISCONNECTED。之后,如果客户端重新连接上了服务器,那么很不幸,服务器会告诉客户端该会话已经失效(SESSION_EXPIRED)。在这种情况下,用户就需要重新实例化一个ZooKeeper对象,并且看应用的复杂情况,重新恢复临时数据。
对于连接断开的场景下,Zk客户端会自动尝试重连其他节点;但是会话失效的场景就需要考虑了,毕竟涉及到临时节点和Watcher,那么影响就会很大的。比如注册中心或是分布式锁的应用场景。
会话失效的情况一般有如下几种情况:
为什么会说到JVM?其实这也是最容易忽略的问题,尤其是Java应用的监控没有上的情况下。首先Zookeeper本身就是一个Java应用,其内存管理是受到了JVM的内存设置限制的。因此,对于这一类托管在JVM上的应用程序,必须考虑到JVM内存设置的问题。
如何解决?
对于失效的场景,比较合适的就是增加了一个监听器;监听session expired事件,并且在事件发生的时候进行处理。什么处理?自然是客户端重新拉起zk连接会话。
package com.xiaoju.dqa.prometheus.client.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SessionConnectionListener implements ConnectionStateListener {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private String path;
private String data;
public SessionConnectionListener(String path, String data) {
this.path = path;
this.data = data;
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState){
if(connectionState == ConnectionState.LOST){
logger.error("[负载均衡失败]zk session超时");
while(true){
try {
if(curatorFramework.getZookeeperClient().blockUntilConnectedOrTimedOut()){
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, data.getBytes("UTF-8"));
logger.info("[负载均衡修复]重连zk成功");
break;
}
} catch (InterruptedException e) {
break;
} catch (Exception e){
}
}
}
}
}
参考
Zookeeper Curator 处理会话过期 Session Expired