IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ZooKeeper源码分析之Session(一) -> 正文阅读

[大数据]ZooKeeper源码分析之Session(一)

2021SC@SDUSC

客户端创建 Socket 连接后,会尝试与服务器连接,如果连接成功,则与服务器之间形成Session。

前言

在ZooKeeper中,客户端和服务端之间的会话是怎样创建、检查更新、删除以及被如何维护的,将再接下来的文章中进行介绍和分析。

SessionTrackerImpl

ZooKeeper 在 SessionTrackerImpl 中实现了Session的各种操作:创建session,检查更新session、删除session等。而本篇文章将对该类进行简单概述。
SessionTrackerImpl 主要是来维护客户端和服务器之间的session。SessionTrackerImpl实现了SessionTracker,同时也继承了ZooKeeperCriticalThread。即SessionTrackerImpl也是一个线程。

/**
  *按记号间隔分组跟踪会话。
  *总是将tick的间隔取整,以提供某种宽限期。
  *因此,会话将在由在给定时间间隔内过期的会话组成的批中过期。 
 */
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {}

属性

在服务端通过 SessionTrackerImpl 和 ExpiryQueue 来保存Session会话信息。
SessionTrackerImpl属性:

    // 存储着会话id
    protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = new ConcurrentHashMap<Long, SessionImpl>();
    // 队列存储失效的会话
    private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
    // 存储超时会话
    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
    // 下一个会话id
    private final AtomicLong nextSessionId = new AtomicLong();
    // 失效的会话
    private final SessionExpirer expirer;

ExpiryQueue属性:

   // E即是session实例对象,long为失效时间
   private final ConcurrentHashMap<E, Long> elemMap = new ConcurrentHashMap<E, Long>();
   // 当前失效时间的session集合
   private final ConcurrentHashMap<Long, Set<E>> expiryMap = new ConcurrentHashMap<Long, Set<E>>();
   // 下一次失效时间
   private final AtomicLong nextExpirationTime = new AtomicLong();
   // 失效时间间隔
   private final int expirationInterval;
   

run()线程机制

SessionTrackerImpl的run()与之前文章NIOServerCnxnFactory源码分析中ConnectionExpirerThread内的run()类似。

    public void run() {
        try {
            while (running) {
                long waitTime = sessionExpiryQueue.getWaitTime();//等待时间
                if (waitTime > 0) {// 如果等待时间大于0
                    Thread.sleep(waitTime);// 当前线程睡眠,等待时间到下一次失效时间
                    continue;
                }

                for (SessionImpl s : sessionExpiryQueue.poll()) {// 下次失效时间对应的session关闭
                    ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);// 进行closeSession操作
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

initializeNextSessionId生成会话ID

初始化下一次会话的ID,以便客户端发起连接请求时能够获取不冲突且正确的会话ID。

    /**
     * 生成初始会话ID. 高阶1字节是serverId
     * 下5个字节来自timestamp ,低阶2字节是0s。
     * 使用“>>>8”,而不是“>>8”,以确保高位1字节完全由服务器Id决定
     * @param id server Id
     * @return the Session Id
     */
    public static long initializeNextSessionId(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8; // 生成下一次会话ID
        nextSid = nextSid | (id << 56);
        if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
            ++nextSid;  // 这是一个不太可能的边缘情况
        }
        return nextSid;
    }

touchSession确定ticktime

touchSession是每次接收到客户端的消息之后就进行的操作,其主要是延长对应的session的超时的时间.

    public synchronized boolean touchSession(long sessionId, int timeout) {
        SessionImpl s = sessionsById.get(sessionId);

        if (s == null) {//对应session不存在
            logTraceTouchInvalidSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session
            return false;
        }

        if (s.isClosing()) {//对应session已经被标识为关闭
            logTraceTouchClosingSession(sessionId, timeout);//计算其对应的时间,移除原来位置的session,加入到新的session
            return false;
        }
        //更新会话失效时间
        updateSessionExpiry(s, timeout);
        return true;
    }

基本方法

    // 确定session的timeout和id
    public long createSession(int sessionTimeout) {
        long sessionId = nextSessionId.getAndIncrement();
        trackSession(sessionId, sessionTimeout);
        return sessionId;
    }

    // 删除会话
    public synchronized void removeSession(long sessionId) {
        LOG.debug("Removing session 0x{}", Long.toHexString(sessionId));
        // 删除会话ID
        SessionImpl s = sessionsById.remove(sessionId);
        // 删除会话timeout
        sessionsWithTimeout.remove(sessionId);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.SESSION_TRACE_MASK,
                "SessionTrackerImpl --- Removing session 0x" + Long.toHexString(sessionId));
        }
        if (s != null) {// 在会话失效队列中删除
            sessionExpiryQueue.remove(s);
        }
    }

总结

本章对会话的维护进行了简单分析,下一章将对会话的创建进行细节分析。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-29 16:23:14  更:2021-11-29 16:23:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 13:50:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码