IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ZooKeeper(三)客户端服务端会话空闲超时管理分桶策略源码分析 -> 正文阅读

[大数据]ZooKeeper(三)客户端服务端会话空闲超时管理分桶策略源码分析

客户端连接源码分析

ZKClient 客户端,Curator 客户端
先下结论:

Client 要创建一个连接,其首先会在本地创建一个 ZooKeeper 对象,用于表示其所连接上的 Server。
连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
连接关闭后,这个代表 Server 的 zk 对象会被删除。

我们知道常用的ZK客户端技术有ZKClient 客户端,Curator 客户端,而客户端在连接ZK Server的时候,会配置集群信息,而连接集群中具体哪一台服务器,采用轮询的方式,先将集群配置信息打散,打散以后再轮询(默认情况,当然可以指定重连策略)
ZKClient源码分析:

public class ZKClientTest {
	// 指定 zk 集群
	private static final String CLUSTER = "zkOS:2181";
	// 指定节点名称
	private static final String PATH = "/mylog";
	public static void main(String[] args) {
		// ---------------- 创建会话 -----------
		// 创建 zkClient
		ZkClient zkClient = new ZkClient(CLUSTER);
		// 为 zkClient 指定序列化器
		zkClient.setZkSerializer(new SerializableSerializer());
		// ---------------- 创建节点 -----------
		// 指定创建持久节点
		CreateMode mode = CreateMode.PERSISTENT;
		// 指定节点数据内容
		String data = "first log";
		// 创建节点
		String nodeName = zkClient.create(PATH, data, mode);
		...

追踪ZKClient源码,看下是如何连接的,从ZkClient 构造开始:

public class ZkClient implements Watcher {
	...
	public ZkClient(String serverstring) {
        this(serverstring, Integer.MAX_VALUE);
    }
    public ZkClient(String zkServers, int connectionTimeout) {
    	//关键点 看到创建了ZkConnection对象
        this(new ZkConnection(zkServers), connectionTimeout);
    }
    ...
    //构造一直走,会走到下面该方法
    public ZkClient(final IZkConnection zkConnection, final int connectionTimeout, final ZkSerializer zkSerializer, final long operationRetryTimeout) {
        if (zkConnection == null) {
            throw new NullPointerException("Zookeeper connection is null!");
        }
        //将创建的ZkConnection,赋值到_connection 成员变量上
        _connection = zkConnection;
        _zkSerializer = zkSerializer;
        _operationRetryTimeoutInMillis = operationRetryTimeout;
        _isZkSaslEnabled = isZkSaslEnabled();
        connect(connectionTimeout, this);
    }

    public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) throws ZkInterruptedException, ZkTimeoutException, IllegalStateException {
        boolean started = false;
        acquireEventLock();
        try {
            setShutdownTrigger(false);
            _eventThread = new ZkEventThread(_connection.getServers());
            _eventThread.start();
            //调用ZkConnection.connect进行连接
            _connection.connect(watcher);

            LOG.debug("Awaiting connection to Zookeeper server");
            boolean waitSuccessful = waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS);
            if (!waitSuccessful) {
                throw new ZkTimeoutException("Unable to connect to zookeeper server '" + _connection.getServers() + "' with timeout of " + maxMsToWaitUntilConnected + " ms");
            }
            started = true;
        } finally {
            getEventLock().unlock();

            // we should close the zookeeper instance, otherwise it would keep
            // on trying to connect
            if (!started) {
                close();
            }
        }
    }
}

通过上面源码追踪,看到ZKClient连接实际上是通过ZkConnection.connect方法进行连接的,我们继续追踪ZkConnection

public class ZkConnection implements IZkConnection {
	...
	//关键对象ZooKeeper 
	private ZooKeeper _zk = null;
	...
    public ZkConnection(String zkServers, int sessionTimeOut) {
        _servers = zkServers;
        _sessionTimeOut = sessionTimeOut;
    }
    
    @Override
    public void connect(Watcher watcher) {
        _zookeeperLock.lock();
        try {
            if (_zk != null) {
                throw new IllegalStateException("zk client has already been started");
            }
            try {
                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
                //!!!可以看到实际上ZKCLient与服务端连接,靠的就是ZooKeeper对象
                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
            } catch (IOException e) {
                throw new ZkException("Unable to connect to " + _servers, e);
            }
        } finally {
            _zookeeperLock.unlock();
        }
    }
}

Curator 源码分析:

public class FluentTest {
	public static void main(String[] args) throws Exception {
		// ---------------- 创建会话 -----------
		// 创建重试策略对象:重试间隔时间是1秒,最多重试 3 次
		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		// 创建客户端
		CuratorFramework client = CuratorFrameworkFactory
									.builder()
									.connectString("zkOS:2181")
									.sessionTimeoutMs(15000)
									.connectionTimeoutMs(13000)
									.retryPolicy(retryPolicy)
									//namespace:根路径,所有操作都是基于该路径之上
									.namespace("logs")
									.build();
		// 开启客户端
		client.start();
		...

追踪Curator源码,看下是如何连接的,从client.start()开始:

public class CuratorFrameworkImpl implements CuratorFramework{
	...
    @Override
    public void start(){
        log.info("Starting");
        if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) ){
            throw new IllegalStateException("Cannot be started more than once");
        }

        try{
			...
            this.getConnectionStateListenable().addListener(listener);

            client.start();
            ...
        }catch ( Exception e ){
            ThreadUtils.checkInterrupted(e);
            handleBackgroundOperationException(null, e);
        }
    }

}

关注client.start();这个方法:

public class CuratorZookeeperClient implements Closeable{
	...
    public void start() throws Exception
    {
        log.debug("Starting");

        if ( !started.compareAndSet(false, true) )
        {
            throw new IllegalStateException("Already started");
        }

        state.start();
    }
	...
}

继续追踪state.start();

class ConnectionState implements Watcher, Closeable{
	...
    void start() throws Exception{
        log.debug("Starting");
        ensembleProvider.start();
        reset();
    }

    synchronized void reset() throws Exception{
        log.debug("reset");

        instanceIndex.incrementAndGet();

        isConnected.set(false);
        connectionStartMs = System.currentTimeMillis();
        handleHolder.closeAndReset();
        handleHolder.getZooKeeper();   // initiate connection
    }
	```
}

关键点看handleHolder.getZooKeeper()方法:

class HandleHolder{
	...
    ZooKeeper getZooKeeper() throws Exception{
        return (helper != null) ? helper.getZooKeeper() : null;
    }
	...
}

class Helper{
	private final Data data;
	...
    ZooKeeper getZooKeeper() throws Exception{
        return data.zooKeeperHandle;
    }
    ...
}

直接从data里面取了,Hepler是什么时候创建的呢?回到org.apache.curator.ConnectionState#reset,看handleHolder.closeAndReset()方法:

class HandleHolder{
	...

    void closeAndReset() throws Exception{
        internalClose(0);

        Helper.Data data = new Helper.Data();
        helper = new Helper(data){
            @Override
            ZooKeeper getZooKeeper() throws Exception{
                synchronized(this){
                    if ( data.zooKeeperHandle == null ){
                        resetConnectionString(ensembleProvider.getConnectionString());
                        data.zooKeeperHandle = zookeeperFactory.newZooKeeper(data.connectionString, sessionTimeout, watcher, canBeReadOnly);
                    }

                    helper = new Helper(data);

                    return super.getZooKeeper();
                }
            }
        };
    }
	...
}

我们看下data.zooKeeperHandle到底是怎么创建的:

public class NonAdminZookeeperFactory implements ZookeeperFactory{
    @Override
    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception{
        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
    }
}

可以看到,无论哪种客户端技术,最终都会在本地创建一个ZooKeeper对象,接下来我们分析ZK源码中的ZooKeeper对象

ZK源码中客户端对象ZooKeeper
我们找到ZK源码中的ZooKeeper对象代码(下面是构造代码):

// 跟构造
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
    throws IOException
{
    this(connectString, sessionTimeout, watcher, false);
}

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    watchManager.defaultWatcher = watcher;

    // 创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合
    ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
    // 将缓存集合中的地址打散
    HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
    // 创建一个连接实例
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    // 启动连接
    cnxn.start();

ConnectStringParser connectStringParser = new ConnectStringParser(connectString)
创建一个zk集群字符串解析器,将解析出的ip与port构建为一个地址实例,放入到缓存集合

public ConnectStringParser(String connectString) {
    // parse out chroot, if any
    // 解析chroot,如果有的话(其他的一种写法,不用关注)
    int off = connectString.indexOf('/');
    if (off >= 0) {
        String chrootPath = connectString.substring(off);
        // ignore "/" chroot spec, same as null
        if (chrootPath.length() == 1) {
            this.chrootPath = null;
        } else {
            PathUtils.validatePath(chrootPath);
            this.chrootPath = chrootPath;
        }
        connectString = connectString.substring(0, off);
    } else {
        this.chrootPath = null;
    }
	  //逗号分割
    String hostsList[] = connectString.split(",");
    for (String host : hostsList) {
        int port = DEFAULT_PORT;
        int pidx = host.lastIndexOf(':');
        if (pidx >= 0) {
            // otherwise : is at the end of the string, ignore
            if (pidx < host.length() - 1) {
                port = Integer.parseInt(host.substring(pidx + 1));
            }
            host = host.substring(0, pidx);
        }
        //解析出主机和端口后生成地址并添加到serverAddresses
        serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
    }
}

HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());

创建主机提供者,把将缓存集合中的地址打散

public StaticHostProvider(Collection<InetSocketAddress> serverAddresses) {
    // 实例化一个地址解析器
    this.resolver = new Resolver() {
        // 根据主机名获取其对应的所有ip
        @Override
        public InetAddress[] getAllByName(String name) throws UnknownHostException {
            return InetAddress.getAllByName(name);
        }
    };
    // 将地址打散
    init(serverAddresses);
}

private void init(Collection<InetSocketAddress> serverAddresses) {
   if (serverAddresses.isEmpty()) {
       throw new IllegalArgumentException(
               "A HostProvider may not be empty!");
   }
   // 初始化本地的sererAddresses
   this.serverAddresses.addAll(serverAddresses);
   // 将serverAddresses中的元素打散
   Collections.shuffle(this.serverAddresses);
}

打散的目的在于负载均衡,不然每个客户端轮询都会连上第一个

cnxn = new ClientCnxn(…);

创建一个连接实例
cnxn.start();

启动连接

public void start() {
    // 启动连接线程
    sendThread.start();
    // 启动事件线程
    eventThread.start();
}

查看启动连接线程sendThread的run方法

public void run() {
    clientCnxnSocket.introduce(this,sessionId);
    clientCnxnSocket.updateNow();
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    long lastPingRwServer = Time.currentElapsedTime();
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    InetSocketAddress serverAddress = null;
    // 若当前连接对象处于激活状态
    while (state.isAlive()) {
        try {
            // 若当前连接没有连接着server
            if (!clientCnxnSocket.isConnected()) {
                // 若不是第一次连接,则随机sleep一会儿,小于1秒
                // 休息一会儿减少服务器资源消耗
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // don't re-establish connection if we are closing
                // 若sleep过后,当前连接被关闭了,或不可用了,则直接结束
                if (closing || !state.isAlive()) {
                    break;
                }
                // rwServerAddress是一个动态配置的地址,当前Client会先连接该地址
                // 若连接失败,则再连接其它地址
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    // 获取要连接的zkServer的地址
                    serverAddress = hostProvider.next(1000);
                }
                // 开启连接尝试,有可能连接不上,连接不上会循环获取下一个地址继续尝试连接
                startConnect(serverAddress);
                clientCnxnSocket.updateLastSendAndHeard();
            }

            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                           LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent == true) {
                        eventThread.queueEvent(new WatchedEvent(
                              Watcher.Event.EventType.None,
                              authState,null));
                    }
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }
            
            if (to <= 0) {
                String warnInfo;
                warnInfo = "Client session timed out, have not heard from server in "
                    + clientCnxnSocket.getIdleRecv()
                    + "ms"
                    + " for sessionid 0x"
                    + Long.toHexString(sessionId);
                LOG.warn(warnInfo);
                throw new SessionTimeoutException(warnInfo);
            }
            if (state.isConnected()) {
            	//1000(1 second) is to prevent race condition missing to send the second ping
            	//also make sure not to send too many pings when readTimeout is small 
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                		((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
                long now = Time.currentElapsedTime();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }

            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            if (closing) {
                if (LOG.isDebugEnabled()) {
                    // closing so this is expected
                    LOG.debug("An exception was thrown while closing send thread for session 0x"
                            + Long.toHexString(getSessionId())
                            + " : " + e.getMessage());
                }
                break;
            } else {
                // this is ugly, you have a better way speak up
                if (e instanceof SessionExpiredException) {
                    LOG.info(e.getMessage() + ", closing socket connection");
                } else if (e instanceof SessionTimeoutException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof EndOfStreamException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof RWServerFoundException) {
                    LOG.info(e.getMessage());
                } else if (e instanceof SocketException) {
                    LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
                } else {
                    LOG.warn("Session 0x{} for server {}, unexpected error{}",
                                    Long.toHexString(getSessionId()),
                                    serverAddress,
                                    RETRY_CONN_MSG,
                                    e);
                }
                cleanup();
                if (state.isAlive()) {
                    eventThread.queueEvent(new WatchedEvent(
                            Event.EventType.None,
                            Event.KeeperState.Disconnected,
                            null));
                }
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }
        }
    }
    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                Event.KeeperState.Disconnected, null));
    }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
            "SendThread exited loop for session: 0x"
                   + Long.toHexString(getSessionId()));
}

while (state.isAlive()) {
判断当前连接对象是否处于激活状态

// 一共有这么多状态
public enum States {
    CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
    CLOSED, AUTH_FAILED, NOT_CONNECTED;

    public boolean isAlive() {
        return this != CLOSED && this != AUTH_FAILED;
    }

    /**
     * Returns whether we are connected to a server (which
     * could possibly be read-only, if this client is allowed
     * to go to read-only mode)
     * */
    public boolean isConnected() {
        return this == CONNECTED || this == CONNECTEDREADONLY;
    }
}

serverAddress = hostProvider.next(1000);

获取要连接的zkServer的地址

// 轮询获取要连接的server的地址
// 如果所有主机都尝试过一次,则等待spinDelay毫秒。
public InetSocketAddress next(long spinDelay) {
    // 轮询,索引加一后和主机数量取余
    currentIndex = ++currentIndex % serverAddresses.size();
    // lastIndex用于记录上一次连接上的地址
    // 只有当zk是单机模式时,才会出现currentIndex 与 lastIndex 相等的情况(或者已经轮询一轮回来了)
    // 在zk为单机模式时或者已经轮询一轮回来了,若指定了spinDelay,则sleep spinDelay时间
    if (currentIndex == lastIndex && spinDelay > 0) {
        try {
            Thread.sleep(spinDelay);
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    } else if (lastIndex == -1) {   // 处理第一次连接的情况
        // We don't want to sleep on the first ever connect attempt.
        lastIndex = 0;
    }

    // 从地址列表中获取到“轮询索引”对应的地址
    InetSocketAddress curAddr = serverAddresses.get(currentIndex);
    try {
        // 根据地址获取该地址对象的主机名
        // 如果主机名可用,则计算为主机名,否则返回IP地址的字符串表示。
		// 在Java 7中,我们有一个方法getHostString,但是早期版本不支持它。
		// 这个方法是为InetSocketAddress.getHostString()提供一个替换。
        String curHostString = getHostString(curAddr);
        // this.resolver.getAllByName(curHostString)  获取主机名对应的所有的ip,其结果是一个数组(即一个主机名对应多个IP地址)
        // 一个主机多个网卡,就会一个主机名多个IP
        List<InetAddress> resolvedAddresses = new ArrayList<InetAddress>(Arrays.asList(this.resolver.getAllByName(curHostString)));
        if (resolvedAddresses.isEmpty()) {
            return curAddr;
        }
        // 将ip列表打散
        Collections.shuffle(resolvedAddresses);
        // 从打散的ip列表中取第一个ip作为要连接的地址
        return new InetSocketAddress(resolvedAddresses.get(0), curAddr.getPort());
    } catch (UnknownHostException e) {
        return curAddr;
    }
}

startConnect(serverAddress);

开启连接尝试(有可能连接不上,连接不上会循环获取下一个地址继续尝试连接)

private void startConnect(InetSocketAddress addr) throws IOException {
    // initializing it for new connection
    saslLoginFailed = false;
    // 修改状态为CONNECTING
    state = States.CONNECTING;
    // 设置当前连接的名称
    setName(getName().replaceAll("\\(.*\\)",
            "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
    // 若开启了对客户端的SASL验证,则创建一个SASL的客户端
    // Simple Authentication and Security Layer,简单认证与安全层 (SASL)
    // 是一种用来扩充C/S模式验证能力的机制。
    if (ZooKeeperSaslClient.isEnabled()) {
        try {
            String principalUserName = System.getProperty(
                    ZK_SASL_CLIENT_USERNAME, "zookeeper");
            zooKeeperSaslClient =
                new ZooKeeperSaslClient(
                        principalUserName+"/"+addr.getHostName());
        } catch (LoginException e) {
            // An authentication error occurred when the SASL client tried to initialize:
            // for Kerberos this means that the client failed to authenticate with the KDC.
            // This is different from an authentication error that occurs during communication
            // with the Zookeeper server, which is handled below.
            LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
              + "SASL authentication, if Zookeeper server allows it.");
            eventThread.queueEvent(new WatchedEvent(
              Watcher.Event.EventType.None,
              Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    logStartConnect(addr);
    // 连接指定的地址
    clientCnxnSocket.connect(addr);
}

clientCnxnSocket.connect(addr);
连接指定的地址

@Override
void connect(InetSocketAddress addr) throws IOException {
    // 获取nio的客户端channel
    SocketChannel sock = createSock();
    try {
        // 注册并连接
       registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
    initialized = false;

    /*
     * Reset incomingBuffer
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

/**
 * 可以看到具体连接用的就是jdk的NIO
 * 
 */
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
    // 将客户端Channel注册到selector
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    // channel连接指定Server地址
    boolean immediateConnect = sock.connect(addr);
    if (immediateConnect) {
        //连接成功会返回true,会进行一些相关的初始化
        sendThread.primeConnection();
    }
}

服务端连接源码分析

ZooKeeper会话理论知识
会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。

ZooKeeper 客户端启动时,首先会与 zk 服务器建立一个 TCP 长连接。连接一旦建立,客户端会话的生命周期也就开始了。

(1) 会话状态
常见的会话状态有三种:

CONNECTING:连接中。Client 要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的 Server。
CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
CLOSED:已关闭。连接关闭后,这个代表 Server 的 zk 对象会被删除。

(2) 会话连接超时管理—客户端维护

我们这里的会话连接超时管理指的是,客户端所发起的服务端连接时间记录,是从客户端当前会话第一次发起服务端连接的时间开始计时。

ZK是CP架构的,服务端在进行数据同步的时候是不对外提供服务的,但是这个过程是非常快的,对于客户端来说,在连接超时时间内,会一直尝试连接,直到成功,所以服务端不对外提供服务的过程,客户端是感知不到的。

(3) 会话连接事件

客户端与服务端的长连接失效后,客户端将进行重连。在重连过程中客户端会产生三种会话连接事件:

CONNECTION_LOSS:连接丢失
SESSION_MOVED:会话转移。若在客户端连接超时时限范围内又连接上了 Server,且连接的 Server 与之前的不是同一个(集群中的其他机器),则会发生会话转移。
SESSION_EXPIRED:会话失效。若在客户端连接超时时限范围外连接上了 Server,而该Server 中存放的该会话的 sessionId 又被 Server 给干掉了,则该会话失效。

(4) 会话空闲超时管理—服务端维护

会话连接超时针对客户端来说的,会话空闲超时,是针对服务端的

服务器为每一个客户端的会话都记录着上一次交互后空闲的时长,及从上一次交互结束开始会话空闲超时的时间点。一旦空闲时长超时,服务端就会将该会话的 SessionId 从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。

服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略。

A、分桶策略

分桶策略是指,将空闲超时时间相近的会话放到同一个桶中来进行管理,以减少管理的复杂度。在检查超时时,只需要检查桶中剩下的会话即可,因为没有超时的会话已经被移出了桶,而桶中存在的会话就是超时的会话。

zk 对于会话空闲的超时管理并非是精确的管理,即并非是一超时马上就执行相关的超时操作。

B、 分桶依据

分桶的计算依据为:

在这里插入图片描述
CurrentTime:当前时间(这是时间轴上的时间)
SessionTimeout:会话超时时间(这是一个时间范围)
ExpirationTime:当前会话下一次超时的时间点(这是时间轴上的时间)
ExpirationInterval:桶的大小(这是一个时间范围)
BucketTime:代表的是当前会话下次超时的时间点所在的桶
在这里插入图片描述
从以上公式可知,一个桶的大小为 ExpirationInterval 时间。只要 ExpirationTime 落入到同一个桶中,系统就会对其中的会话超时进行统一管理。

服务端连接源码分析

找到ZooKeeperServer.startup方法,一但Server启动就会触发该方法

// 启动的会执行该方法
public synchronized void startup() {
    if (sessionTracker == null) {
        // 创建一个sessionTracker线程
        createSessionTracker();
    }
    // 开启sessionTracker线程
    startSessionTracker();
    setupRequestProcessors();

    registerJMX();

    setState(State.RUNNING);
    notifyAll();
}

createSessionTracker();

创建一个sessionTracker(Session跟踪器)线程’

protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, 1, getZooKeeperServerListener());
}


//SessionTrackerImpl调用的构造
public SessionTrackerImpl(SessionExpirer expirer,
        ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
        long sid, ZooKeeperServerListener listener)
{
    super("SessionTracker", listener);
    this.expirer = expirer;
    // 桶的大小,即过期时间间隔
    this.expirationInterval = tickTime;
    this.sessionsWithTimeout = sessionsWithTimeout;
    // 计算当前时间所在的会话桶
    nextExpirationTime = roundToInterval(Time.currentElapsedTime());
    // 初始化下一个会话一但创建所用的sessionId
    this.nextSessionId = initializeNextSession(sid);
    //从内存中取出之前保存的会话数据,重新加载到sessionTracker(应该是选举后重启之类的场景)
    for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
        // 会话不存在则创建,并将会话放入相应桶中
        addSession(e.getKey(), e.getValue());
    }
}


// 计算指定时间所在的桶
private long roundToInterval(long time) {
    // We give a one interval grace period
    // 即计算当前时间已经走过的时间桶数量+1,在乘以时间桶大小,得到我的桶的过期时间点
    return (time / expirationInterval + 1) * expirationInterval;
}


// 创建当前会话,并将会话放入相应桶中
synchronized public void addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);
    if (sessionsById.get(id) == null) {
        // 创建会话实例,刚创建的会话不在任何桶中
        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
        // 将会话放入到缓存map,注意这个map不是会话桶
        sessionsById.put(id, s);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- Adding session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }
    } else {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                    "SessionTrackerImpl --- Existing session 0x"
                    + Long.toHexString(id) + " " + sessionTimeout);
        }
    }
    // 将会话放入到桶中
    touchSession(id, sessionTimeout);
}

//先看下创建会话做了什么
public static class SessionImpl implements Session {
    SessionImpl(long sessionId, int timeout, long expireTime) {
        this.sessionId = sessionId;
        this.timeout = timeout;
        // 注意,这个tickTime用于记录的是当前会话所在的桶
        // 这个初值为0,表示当前会话不在任何桶中
        this.tickTime = expireTime;
        isClosing = false;
    }
	...
}


// 将会话放入到桶中,该方法的返回值表示当前会话是否有效
synchronized public boolean touchSession(long sessionId, int timeout) {
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG,
                                 ZooTrace.CLIENT_PING_TRACE_MASK,
                                 "SessionTrackerImpl --- Touch session: 0x"
                + Long.toHexString(sessionId) + " with timeout " + timeout);
    }
    // 根据session从缓存map中获取相应Session
    SessionImpl s = sessionsById.get(sessionId);

    // Return false, if the session doesn't exists or marked as closing
    // 若会话不存在,或会话关闭了,则返回false
    if (s == null || s.isClosing()) {
        return false;
    }
    // 计算本次会话交互的下次超时时间点所在的会话桶
    long expireTime = roundToInterval(Time.currentElapsedTime() + timeout);
    // s.tickTime用于记录当前会话所在的桶
    // 若s.tickTime < expireTime,说明应该换桶了
    // 若s.tickTime == expireTime,说明不用换桶
    // 注意不可能出现s.tickTime > expireTime的情况
    if (s.tickTime >= expireTime) {
        // Nothing needs to be done
        // 不需要换桶,不做任何处理
        return true;
    }

    // 代码走到这里,说明s.tickTime < expireTime成立
    // 下面要进行换桶了
    // sessionSets是桶集合,key为桶的边界时间点,value为桶实例SessionSet
    SessionSet set = sessionSets.get(s.tickTime);
    // 若该桶不空,则从该桶中将会话移除
    if (set != null) {
        set.sessions.remove(s);
    }
    // 更新当前会话所在桶的id,即桶的边界桶
    s.tickTime = expireTime;
    // 从桶集合中查找该桶
    set = sessionSets.get(s.tickTime);
    // 若该桶不存在,则创建一个桶,然后将该新建的桶放入桶集合
    if (set == null) {
        set = new SessionSet();
        sessionSets.put(expireTime, set);
    }
    // 将会话放到新桶
    set.sessions.add(s);
    return true;
}


//SessionSet就是桶实例,里面放了Session
static class SessionSet {
    HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
}

上面touchSession方法介绍的是在创建sessionTracker线程时调用的,其实还有很多场景都会调用该方法,比如:

会话与当前Server交互时

// 只要会话与当前Server与交互,就会触发该方法的执行
void touch(ServerCnxn cnxn) throws MissingSessionException {
    if (cnxn == null) {
        return;
    }
    long id = cnxn.getSessionId();
    int to = cnxn.getSessionTimeout();
    // 上面说过,touchSession方法的返回值表示当前会话是否有效
    // 若当前会话是失效的,则抛出异常。否则判断是否换桶,进行换桶
    if (!sessionTracker.touchSession(id, to)) {
        throw new MissingSessionException(
                "No session with sessionid 0x" + Long.toHexString(id)
                + " exists, probably expired and removed");
    }
}

发生会话丢失后,客户端重新发起连接请求时

// 当发生会话丢失后,客户端重新连接上了Server,此时要对session进行验证判断是否失效
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
        int sessionTimeout) throws IOException {
    // 判断指定会话是否有效
    boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                 "Session 0x" + Long.toHexString(sessionId) +
                " is valid: " + rc);
    }
    // 根据rc不同的结果,采用不同的会话处理方式
    finishSessionInit(cnxn, rc);
}



public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
    // register with JMX
    try {
        // 若当前Session仍有效,同意其注册
        if (valid) {
            serverCnxnFactory.registerConnection(cnxn);
        }
    } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
    }

    try {
        // 创建一个连接响应对象
        ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                        // longer valid
                        valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
        bos.writeInt(-1, "len");
        rsp.serialize(bos, "connect");
        if (!cnxn.isOldClient) {
            bos.writeBool(
                    this instanceof ReadOnlyZooKeeperServer, "readOnly");
        }
        baos.close();
        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
        bb.putInt(bb.remaining() - 4).rewind();
        cnxn.sendBuffer(bb);    

        // 若连接无效,则向客户端返回超时响应,否则启用该连接
        if (!valid) {
            LOG.info("Invalid session 0x"
                    + Long.toHexString(cnxn.getSessionId())
                    + " for client "
                    + cnxn.getRemoteSocketAddress()
                    + ", probably expired");
            cnxn.sendBuffer(ServerCnxnFactory.closeConn);
        } else {
            LOG.info("Established session 0x"
                    + Long.toHexString(cnxn.getSessionId())
                    + " with negotiated timeout " + cnxn.getSessionTimeout()
                    + " for client "
                    + cnxn.getRemoteSocketAddress());
            // 启用接收到的连接
            cnxn.enableRecv();
        }
            
    } catch (Exception e) {
        LOG.warn("Exception while establishing session, closing", e);
        cnxn.close();
    }
}

上面只说了桶的创建和换桶,过期的处理没说,过期处理的操作在startSessionTracker()中

startSessionTracker();

开启sessionTracker线程,我们看下SessionTrackerImpl的run方法:

@Override
synchronized public void run() {
    try {
        // 只要当前server处理运行态,则该while就不会停止
        while (running) {
            currentTime = Time.currentElapsedTime();
            // nextExpirationTime是此时还在使用的那个桶的边界时间
            // 刚启动的时候会为nextExpirationTime赋值
			// 如果当前时间还没超过桶的边界时间,说明桶还没过期,不需要处理
            if (nextExpirationTime > currentTime) {
                this.wait(nextExpirationTime - currentTime);
                continue;
            }
            // 代码走到这里,说明nextExpirationTime <= currentTime成立
            // 说明nextExpirationTime桶已经过期,需要将该桶从桶集合中清除,
            // 需要将该桶中的会话关闭
            SessionSet set;
            // 清除当前桶
            set = sessionSets.remove(nextExpirationTime);
            // 关闭该桶中的所有会话
            if (set != null) {
                for (SessionImpl s : set.sessions) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
            // 获取下一个桶id
            nextExpirationTime += expirationInterval;
        }
    } catch (InterruptedException e) {
        handleException(this.getName(), e);
    }
    LOG.info("SessionTrackerImpl exited loop!");
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-26 12:08:54  更:2021-07-26 12:09:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年3日历 -2024/3/29 22:35:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码