客户端连接源码分析
ZKClient 客户端,Curator 客户端 先下结论:
Client 要创建一个连接,其首先会在本地创建一个 ZooKeeper 对象,用于表示其所连接上的 Server。 连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。 连接关闭后,这个代表 Server 的 zk 对象会被删除。
我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略) ZKClient源码分析:
public class ZKClientTest {
private static final String CLUSTER = "zkOS:2181";
private static final String PATH = "/mylog";
public static void main(String[] args) {
ZkClient zkClient = new ZkClient(CLUSTER);
zkClient.setZkSerializer(new SerializableSerializer());
CreateMode mode = CreateMode.PERSISTENT;
String data = "first log";
String nodeName = zkClient.create(PATH, data, mode);
...
追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:
public class ZkClient implements Watcher {
...
public ZkClient(String serverstring) {
this(serverstring, Integer.MAX_VALUE);
}
public ZkClient(String zkServers, int connectionTimeout) {
this(new ZkConnection(zkServers), connectionTimeout);
}
...
public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {
if (zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
}
_connection = zkConnection;
_zkSerializer = zkSerializer;
_operationRetryTimeoutInMillis = operationRetryTimeout;
_isZkSaslEnabled = isZkSaslEnabled();
connect(connectionTimeout, this);
}
public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
boolean started = false;
acquireEventLock();
try {
setShutdownTrigger(false);
_eventThread = new ZkEventThread(_connection.getServers());
_eventThread.start();
_connection.connect(watcher);
LOG.debug("Awaiting connection to Zookeeper server");
boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);
if (!waitSuccessful) {
throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");
}
started = true;
} finally {
getEventLock().unlock();
if (!started) {
close();
}
}
}
}
通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection
public class ZkConnection implements IZkConnection {
...
private ZooKeeper _zk = null;
...
public ZkConnection(String zkServers, int sessionTimeOut) {
_servers = zkServers;
_sessionTimeOut = sessionTimeOut;
}
@Override
public void connect(Watcher watcher) {
_zookeeperLock.lock();
try {
if (_zk != null) {
throw new IllegalStateException("zk client has already been started");
}
try {
LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
_zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
} catch (IOException e) {
throw new ZkException("Unable to connect to " + _servers, e);
}
} finally {
_zookeeperLock.unlock();
}
}
}
Curator 源码分析:
public class FluentTest {
public static void main(String[] args) throws Exception {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("zkOS:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(13000)
.retryPolicy(retryPolicy)
.namespace("logs")
.build();
client.start();
...
追踪Curator源码,看下是如何连接的,从client.start()开始:
public class CuratorFrameworkImpl implements CuratorFramework{
...
@Override
public void start(){
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){
throw new IllegalStateException("Cannot be started more than once");
}
try{
...
this.getConnectionStateListenable().addListener(listener);
client.start();
...
}catch ( Exception e ){
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
}
关注client.start();这个方法:
public class CuratorZookeeperClient implements Closeable{
...
public void start() throws Exception
{
log.debug("Starting");
if ( !started.compareAndSet(false, true) )
{
throw new IllegalStateException("Already started");
}
state.start();
}
...
}
继续追踪state.start();
class ConnectionState implements Watcher, Closeable{
...
void start() throws Exception{
log.debug("Starting");
ensembleProvider.start();
reset();
}
synchronized void reset() throws Exception{
log.debug("reset");
instanceIndex.incrementAndGet();
isConnected.set(false);
connectionStartMs = System.currentTimeMillis();
handleHolder.closeAndReset();
handleHolder.getZooKeeper();
}
```
}
关键点看handleHolder.getZooKeeper()方法:
class HandleHolder{
...
ZooKeeper getZooKeeper() throws Exception{
return (helper != null) ? helper.getZooKeeper() : null;
}
...
}
class Helper{
private final Data data;
...
ZooKeeper getZooKeeper() throws Exception{
return data.zooKeeperHandle;
}
...
}
直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset,看handleHolder.closeAndReset()方法:
class HandleHolder{
...
void closeAndReset() throws Exception{
internalClose(0);
Helper.Data data = new Helper.Data();
helper = new Helper(data){
@Override
ZooKeeper getZooKeeper() throws Exception{
synchronized(this){
if ( data.zooKeeperHandle == null ){
resetConnectionString(ensembleProvider.getConnectionString());
data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);
}
helper = new Helper(data);
return super.getZooKeeper();
}
}
};
}
...
}
我们看下data.zooKeeperHandle到底是怎么创建的:
public class NonAdminZookeeperFactory implements ZookeeperFactory{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
}
}
可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象,接下来我们分析ZK源码中的ZooKeeper对象
ZK源码中客户端对象ZooKeeper 我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
ConnectStringParser connectStringParser = new ConnectStringParser(connectString) 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
public ConnectStringParser(String connectString) {
int off = connectString.indexOf('/');
if (off >= 0) {
String chrootPath = connectString.substring(off);
if (chrootPath.length() == 1) {
this.chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
}
connectString = connectString.substring(0, off);
} else {
this.chrootPath = null;
}
String hostsList[] = connectString.split(",");
for (String host : hostsList) {
int port = DEFAULT_PORT;
int pidx = host.lastIndexOf(':');
if (pidx >= 0) {
if (pidx < host.length() - 1) {
port = Integer.parseInt(host.substring(pidx + 1));
}
host = host.substring(0, pidx);
}
serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
}
}
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
创建主机提供者,把将缓存集合中的地址打散
public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
this.resolver = new Resolver() {
@Override
public InetAddress[] getAllByName(String name) throws UnknownHostException {
return InetAddress.getAllByName(name);
}
};
init(serverAddresses);
}
private void init(Collection<InetSocketAddress> serverAddresses) {
if (serverAddresses.isEmpty()) {
throw new IllegalArgumentException(
"A HostProvider may not be empty!");
}
this.serverAddresses.addAll(serverAddresses);
Collections.shuffle(this.serverAddresses);
}
打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个
cnxn = new ClientCnxn(…);
创建一个连接实例 cnxn.start();
启动连接
public void start() {
sendThread.start();
eventThread.start();
}
查看启动连接线程sendThread的run方法
public void run() {
clientCnxnSocket.introduce(this,sessionId);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000;
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
}
if (closing || !state.isAlive()) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
if (state.isConnected()) {
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent == true) {
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
authState,null));
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv()
+ "ms"
+ " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout =
Math.min(2*pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
}
break;
} else {
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()),
serverAddress,
RETRY_CONN_MSG,
e);
}
cleanup();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
Event.EventType.None,
Event.KeeperState.Disconnected,
null));
}
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
}
}
cleanup();
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
Event.KeeperState.Disconnected, null));
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x"
+ Long.toHexString(getSessionId()));
}
while (state.isAlive()) { 判断当前连接对象是否处于激活状态
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
serverAddress = hostProvider.next(1000);
获取要连接的zkServer的地址
public InetSocketAddress next(long spinDelay) {
currentIndex = ++currentIndex % serverAddresses.size();
if (currentIndex == lastIndex && spinDelay > 0) {
try {
Thread.sleep(spinDelay);
} catch (InterruptedException e) {
LOG.warn("Unexpected exception", e);
}
} else if (lastIndex == -1) {
lastIndex = 0;
}
InetSocketAddress curAddr = serverAddresses.get(currentIndex);
try {
String curHostString = getHostString(curAddr);
List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
if (resolvedAddresses.isEmpty()) {
return curAddr;
}
Collections.shuffle(resolvedAddresses);
return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
} catch (UnknownHostException e) {
return curAddr;
}
}
startConnect(serverAddress);
开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接)
private void startConnect(InetSocketAddress addr) throws IOException {
saslLoginFailed = false;
state = States.CONNECTING;
setName(getName().replaceAll("\\(.*\\)",
"(" + addr.getHostName() + ":" + addr.getPort() + ")"));
if (ZooKeeperSaslClient.isEnabled()) {
try {
String principalUserName = System.getProperty(
ZK_SASL_CLIENT_USERNAME, "zookeeper");
zooKeeperSaslClient =
new ZooKeeperSaslClient(
principalUserName+"/"+addr.getHostName());
} catch (LoginException e) {
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
+ "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
saslLoginFailed = true;
}
}
logStartConnect(addr);
clientCnxnSocket.connect(addr);
}
clientCnxnSocket.connect(addr); 连接指定的地址
@Override
void connect(InetSocketAddress addr) throws IOException {
SocketChannel sock = createSock();
try {
registerAndConnect(sock, addr);
} catch (IOException e) {
LOG.error("Unable to open socket to " + addr);
sock.close();
throw e;
}
initialized = false;
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
boolean immediateConnect = sock.connect(addr);
if (immediateConnect) {
sendThread.primeConnection();
}
}
服务端连接源码分析
ZooKeeper会话理论知识 会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。
ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。
(1) 会话状态 常见的会话状态有三种:
CONNECTING:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。 CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。 CLOSED:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。
(2) 会话连接超时管理—客户端维护
我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。
ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。
(3) 会话连接事件
客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:
CONNECTION_LOSS:连接丢失 SESSION_MOVED:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。 SESSION_EXPIRED:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。
(4) 会话空闲超时管理—服务端维护
会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的
服务器为每一个客户端的会话都记录着上一次交互后空闲的时长,及从上一次交互结束开始会话空闲超时的时间点。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。
服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略。
A、分桶策略
分桶策略是指,将空闲超时时间相近的会话放到同一个桶中来进行管理,以减少管理的复杂度。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。
zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。
B、 分桶依据
分桶的计算依据为:
 CurrentTime:当前时间(这是时间轴上的时间) SessionTimeout:会话超时时间(这是一个时间范围) ExpirationTime:当前会话下一次超时的时间点(这是时间轴上的时间) ExpirationInterval:桶的大小(这是一个时间范围) BucketTime:代表的是当前会话下次超时的时间点所在的桶  从以上公式可知,一个桶的大小为 ExpirationInterval 时间。只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。
服务端连接源码分析
找到ZooKeeperServer.startup方法,一但Server启动就会触发该方法
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();
registerJMX();
setState(State.RUNNING);
notifyAll();
}
createSessionTracker();
创建一个sessionTracker(Session跟踪器)线程’
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());
}
public SessionTrackerImpl(SessionExpirer expirer,
ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
long sid, ZooKeeperServerListener listener)
{
super("SessionTracker", listener);
this.expirer = expirer;
this.expirationInterval = tickTime;
this.sessionsWithTimeout = sessionsWithTimeout;
nextExpirationTime = roundToInterval(Time.currentElapsedTime());
this.nextSessionId = initializeNextSession(sid);
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
}
private long roundToInterval(long time) {
return (time / expirationInterval + 1) * expirationInterval;
}
synchronized public void addSession(long id, int sessionTimeout) {
sessionsWithTimeout.put(id, sessionTimeout);
if (sessionsById.get(id) == null) {
SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
sessionsById.put(id, s);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- Adding session 0x"
+ Long.toHexString(id) + " " + sessionTimeout);
}
} else {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- Existing session 0x"
+ Long.toHexString(id) + " " + sessionTimeout);
}
}
touchSession(id, sessionTimeout);
}
public static class SessionImpl implements Session {
SessionImpl(long sessionId, int timeout, long expireTime) {
this.sessionId = sessionId;
this.timeout = timeout;
this.tickTime = expireTime;
isClosing = false;
}
...
}
synchronized public boolean touchSession(long sessionId, int timeout) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.CLIENT_PING_TRACE_MASK,
"SessionTrackerImpl --- Touch session: 0x"
+ Long.toHexString(sessionId) + " with timeout " + timeout);
}
SessionImpl s = sessionsById.get(sessionId);
if (s == null || s.isClosing()) {
return false;
}
long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
if (s.tickTime >= expireTime) {
return true;
}
SessionSet set = sessionSets.get(s.tickTime);
if (set != null) {
set.sessions.remove(s);
}
s.tickTime = expireTime;
set = sessionSets.get(s.tickTime);
if (set == null) {
set = new SessionSet();
sessionSets.put(expireTime, set);
}
set.sessions.add(s);
return true;
}
static class SessionSet {
HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
}
上面touchSession方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:
会话与当前Server交互时
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException(
"No session with sessionid 0x" + Long.toHexString(id)
+ " exists, probably expired and removed");
}
}
发生会话丢失后,客户端重新发起连接请求时
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
}
finishSessionInit(cnxn, rc);
}
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
try {
if (valid) {
serverCnxnFactory.registerConnection(cnxn);
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
try {
ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
: 0, valid ? cnxn.getSessionId() : 0,
valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
if (!cnxn.isOldClient) {
bos.writeBool(
this instanceof ReadOnlyZooKeeperServer, "readOnly");
}
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
cnxn.sendBuffer(bb);
if (!valid) {
LOG.info("Invalid session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " for client "
+ cnxn.getRemoteSocketAddress()
+ ", probably expired");
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
} else {
LOG.info("Established session 0x"
+ Long.toHexString(cnxn.getSessionId())
+ " with negotiated timeout " + cnxn.getSessionTimeout()
+ " for client "
+ cnxn.getRemoteSocketAddress());
cnxn.enableRecv();
}
} catch (Exception e) {
LOG.warn("Exception while establishing session, closing", e);
cnxn.close();
}
}
上面只说了桶的创建和换桶,过期的处理没说,过期处理的操作在startSessionTracker()中
startSessionTracker();
开启sessionTracker线程,我们看下SessionTrackerImpl的run方法:
@Override
synchronized public void run() {
try {
while (running) {
currentTime = Time.currentElapsedTime();
if (nextExpirationTime > currentTime) {
this.wait(nextExpirationTime - currentTime);
continue;
}
SessionSet set;
set = sessionSets.remove(nextExpirationTime);
if (set != null) {
for (SessionImpl s : set.sessions) {
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
nextExpirationTime += expirationInterval;
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
|