2021SC@SDUSC
客户端创建 Socket 连接后,会尝试与服务器连接,如果连接成功,则与服务器之间形成Session。
前言
在ZooKeeper中,客户端和服务端之间的会话是怎样创建、检查更新、删除以及被如何维护的,将再接下来的文章中进行介绍和分析。
SessionTrackerImpl
ZooKeeper 在 SessionTrackerImpl 中实现了Session的各种操作:创建session,检查更新session、删除session等。而本篇文章将对该类进行简单概述。 SessionTrackerImpl 主要是来维护客户端和服务器之间的session。SessionTrackerImpl实现了SessionTracker,同时也继承了ZooKeeperCriticalThread。即SessionTrackerImpl也是一个线程。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {}
属性
在服务端通过 SessionTrackerImpl 和 ExpiryQueue 来保存Session会话信息。 SessionTrackerImpl属性:
protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>();
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
private final AtomicLong nextSessionId = new AtomicLong();
private final SessionExpirer expirer;
ExpiryQueue属性:
private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
private final AtomicLong nextExpirationTime = new AtomicLong();
private final int expirationInterval;
run()线程机制
SessionTrackerImpl的run()与之前文章NIOServerCnxnFactory源码分析中ConnectionExpirerThread内的run()类似。
public void run() {
try {
while (running) {
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (SessionImpl s : sessionExpiryQueue.poll()) {
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);
}
}
} catch (InterruptedException e) {
handleException(this.getName(), e);
}
LOG.info("SessionTrackerImpl exited loop!");
}
initializeNextSessionId生成会话ID
初始化下一次会话的ID,以便客户端发起连接请求时能够获取不冲突且正确的会话ID。
public static long initializeNextSessionId(long id) {
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
++nextSid;
}
return nextSid;
}
touchSession确定ticktime
touchSession是每次接收到客户端的消息之后就进行的操作,其主要是延长对应的session的超时的时间.
public synchronized boolean touchSession(long sessionId, int timeout) {
SessionImpl s = sessionsById.get(sessionId);
if (s == null) {
logTraceTouchInvalidSession(sessionId, timeout);
return false;
}
if (s.isClosing()) {
logTraceTouchClosingSession(sessionId, timeout);
return false;
}
updateSessionExpiry(s, timeout);
return true;
}
基本方法
public long createSession(int sessionTimeout) {
long sessionId = nextSessionId.getAndIncrement();
trackSession(sessionId, sessionTimeout);
return sessionId;
}
public synchronized void removeSession(long sessionId) {
LOG.debug("Removing session 0x{}", Long.toHexString(sessionId));
SessionImpl s = sessionsById.remove(sessionId);
sessionsWithTimeout.remove(sessionId);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
ZooTrace.SESSION_TRACE_MASK,
"SessionTrackerImpl --- Removing session 0x" + Long.toHexString(sessionId));
}
if (s != null) {
sessionExpiryQueue.remove(s);
}
}
总结
本章对会话的维护进行了简单分析,下一章将对会话的创建进行细节分析。
|