IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ZooKeeper : Curator框架重试策略和Session API介绍 -> 正文阅读

[大数据]ZooKeeper : Curator框架重试策略和Session API介绍

ZooKeeper : Curator框架重试策略和Session API介绍

在学习Curator框架API之前,可以先了解Java客户端原生API,这样不仅可以更好的理解Curator框架API,还可以突出Curator框架的方便和强大。

Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API简化了ZooKeeper的操作。Curator框架主要解决了三类问题:

  • 封装ZooKeeper ClientZooKeeper Server之间的连接处理(提供连接重试机制等)。
  • 提供了一套Fluent风格的API,并且在Java客户端原生API的基础上进行了增强(创捷节点、删除节点等)。
  • 提供ZooKeeper各种应用场景(分布式锁、leader选举、共享计数器、分布式队列等)的抽象封装。

博主将Curator框架API分为SessionZnodeACLWatcherTransaction这几个部分来进行介绍,限于篇幅原因,本篇博客只介绍Session API以及其中的重试策略,之后的博客会介绍其他API的使用。博主使用的Curator框架版本是5.2.0ZooKeeper版本是3.6.3

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>

在这里插入图片描述
在这里插入图片描述

重试策略

Java客户端原生API中,客户端与服务端的连接是没有提供连接重试机制的,如果客户端需要重连,就只能将上一次连接的Session IDSession Password发送给服务端进行重连。而Curator框架提供了客户端与服务端的连接重试机制,并且可以通过Fluent风格的API来给连接添加重试策略。

RetryPolicy接口是重试策略的抽象,allowRetry方法用来判断是否允许重试。

package org.apache.curator;

import org.apache.zookeeper.KeeperException;

/**
 * 重连策略的抽象
 */
public interface RetryPolicy
{
    /**
     * 当操作由于某种原因失败时调用,此方法应返回true以进行另一次尝试
     * retryCount – 到目前为止重试的次数(第一次为0)
     * elapsedTimeMs – 自尝试操作以来经过的时间(以毫秒为单位)
     * sleeper – 使用它来睡眠,不要调用Thread.sleep
     */
    boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);

    /**
     * 当操作因特定异常而失败时调用,此方法应返回true以进行另一次尝试
     */
    default boolean allowRetry(Throwable exception)
    {
        if ( exception instanceof KeeperException)
        {
            final int rc = ((KeeperException) exception).code().intValue();
            return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
                    (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
                    (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
                    (rc == KeeperException.Code.SESSIONEXPIRED.intValue());
        }
        return false;
    }
}

RetryPolicy接口的关系图如下所示:在这里插入图片描述
SleepingRetry抽象类:

package org.apache.curator.retry;

import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import java.util.concurrent.TimeUnit;

abstract class SleepingRetry implements RetryPolicy
{
    private final int n;

    protected SleepingRetry(int n)
    {
        this.n = n;
    }

    // made public for testing
    public int getN()
    {
        return n;
    }

    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        if ( retryCount < n )
        {
            try
            {
                sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
            }
            catch ( InterruptedException e )
            {
                Thread.currentThread().interrupt();
                return false;
            }
            return true;
        }
        return false;
    }

    protected abstract long  getSleepTimeMs(int retryCount, long elapsedTimeMs);
}

重试n次(有次数限制),重试之前先进行睡眠,睡眠时间由getSleepTimeMs方法得到。

SessionFailedRetryPolicy类:

package org.apache.curator;

import org.apache.zookeeper.KeeperException;

/**
 * Session过期导致操作失败时的重连策略
 */
public class SessionFailedRetryPolicy implements RetryPolicy
{

    private final RetryPolicy delegatePolicy;

    public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
    {
        this.delegatePolicy = delegatePolicy;
    }

    @Override
    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
    }

    @Override
    public boolean allowRetry(Throwable exception)
    {
        if ( exception instanceof KeeperException.SessionExpiredException )
        {
            return false;
        }
        else
        {
            return delegatePolicy.allowRetry(exception);
        }
    }
}

这里只是增加了对SessionExpiredException这种异常的判断,当遇到Session过期异常时,不再进行重连,即返回false。而其他的所有业务全部委托给delegatePolicy实例。因为RetryPolicy接口的allowRetry(Throwable exception)方法有默认实现:

    default boolean allowRetry(Throwable exception)
    {
        if ( exception instanceof KeeperException)
        {
            final int rc = ((KeeperException) exception).code().intValue();
            return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
                    (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
                    (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
                    (rc == KeeperException.Code.SESSIONEXPIRED.intValue());
        }
        return false;
    }

当遇到Session过期异常时,允许进行重连(rc == KeeperException.Code.SESSIONEXPIRED.intValue())。SessionFailedRetryPolicy这种重连策略用的不多,这里就不详细介绍了。

RetryForever类:

package org.apache.curator.retry;

import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;

/**
 * 始终允许重试
 */
public class RetryForever implements RetryPolicy
{
    private static final Logger log = LoggerFactory.getLogger(RetryForever.class);

    // 重试间隔时间,单位毫秒
    private final int retryIntervalMs;

    public RetryForever(int retryIntervalMs)
    {
        checkArgument(retryIntervalMs > 0);
        this.retryIntervalMs = retryIntervalMs;
    }

    @Override
    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        try
        {
            sleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
            log.warn("Error occurred while sleeping", e);
            return false;
        }
        return true;
    }
}

重试没有次数限制,一般也不常用。

RetryNTimes类(继承SleepingRetry抽象类):

package org.apache.curator.retry;

/**
 * Retry policy that retries a max number of times
 */
public class RetryNTimes extends SleepingRetry
{
    private final int sleepMsBetweenRetries;

    public RetryNTimes(int n, int sleepMsBetweenRetries)
    {
        super(n);
        this.sleepMsBetweenRetries = sleepMsBetweenRetries;
    }

    @Override
    protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        return sleepMsBetweenRetries;
    }
}

继承了SleepingRetry抽象类,没有重写allowRetry方法,因此也是重试n次,实现getSleepTimeMs方法,改方法返回重试之间的睡眠时间。

RetryOneTime类(继承了RetryNTimes类):

public class RetryOneTime extends RetryNTimes
{
    public RetryOneTime(int sleepMsBetweenRetry)
    {
        super(1, sleepMsBetweenRetry);
    }
}

只重试一次的重试策略。

ExponentialBackoffRetry类(继承了SleepingRetry抽象类):

package org.apache.curator.retry;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;

public class ExponentialBackoffRetry extends SleepingRetry
{
    private static final Logger log = LoggerFactory.getLogger(ExponentialBackoffRetry.class);

    private static final int MAX_RETRIES_LIMIT = 29;
    private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;

    private final Random random = new Random();
    private final int baseSleepTimeMs;
    private final int maxSleepMs;

    /**
     * baseSleepTimeMs – 重试之间等待的初始时间
     * maxRetries – 最大重试次数
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
    {
        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
    }

    /**
     * baseSleepTimeMs – 重试之间等待的初始时间
     * maxRetries – 最大重试次数
     * maxSleepMs – 每次重试时休眠的最长时间(以毫秒为单位)
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
    {
        super(validateMaxRetries(maxRetries));
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxSleepMs = maxSleepMs;
    }

    @VisibleForTesting
    public int getBaseSleepTimeMs()
    {
        return baseSleepTimeMs;
    }

    @Override
    protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        // copied from Hadoop's RetryPolicies.java
        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
        if ( sleepMs > maxSleepMs )
        {
            log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
            sleepMs = maxSleepMs;
        }
        return sleepMs;
    }

    private static int validateMaxRetries(int maxRetries)
    {
        if ( maxRetries > MAX_RETRIES_LIMIT )
        {
            log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
            maxRetries = MAX_RETRIES_LIMIT;
        }
        return maxRetries;
    }
}

重试maxRetries次,maxRetries如果大于最大重试次数限制(MAX_RETRIES_LIMIT),即大于29maxRetries就会被MAX_RETRIES_LIMIT覆盖,否则不改变maxRetries的大小。有意思的是getSleepTimeMs方法获取的重试之间的睡眠时间并不是不变的,而是随机得到的,并且随着重试次数的增加,睡眠时间的随机范围不断扩大(右边界不断扩大),如果随机得到的睡眠时间超过maxSleepMs(如果没有被指定,默认为DEFAULT_MAX_SLEEP_MS, 即 Integer.MAX_VALUE),会被maxSleepMs覆盖,而随机得到的睡眠时间小于baseSleepTimeMs,也会被baseSleepTimeMs覆盖。

package org.apache.curator.retry;

import com.google.common.annotations.VisibleForTesting;

public class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry
{
    private final int maxSleepTimeMs;

    /**
     * baseSleepTimeMs – 重试之间等待的初始时间
     * maxSleepTimeMs – 重试之间等待的最长时间
     * maxRetries – 重试的最大次数
     */
    public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries)
    {
        super(baseSleepTimeMs, maxRetries);
        this.maxSleepTimeMs = maxSleepTimeMs;
    }

    @VisibleForTesting
    public int getMaxSleepTimeMs()
    {
        return maxSleepTimeMs;
    }

    @Override
    protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        return Math.min(maxSleepTimeMs, super.getSleepTimeMs(retryCount, elapsedTimeMs));
    }
}

maxSleepTimeMs属性和maxSleepMs属性作用是一样的,都是为了限制睡眠时间的最大取值。只是maxSleepTimeMs属性不会提供默认值,因此必须要指定它的大小。个人感觉maxSleepTimeMs属性完全可以不需要。

连接

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.*;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 10:30
 * @Blog: https://kaven.blog.csdn.net
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */

public class Application{

    private static final String SERVER_PROXY = "192.168.1.184:9000";
    private static final int TIMEOUT = 40000;

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(SERVER_PROXY)
                .namespace("curator")
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(TIMEOUT)
                .sessionTimeoutMs(TIMEOUT)
                .build();

        curator.start();
        if (curator.getState().equals(CuratorFrameworkState.STARTED)) {
            System.out.println("连接成功!");
        }
    }
}

输出:

连接成功!

这就是Fluent风格的API

  • connectStringZooKeeper服务端的地址。
  • namespace:命名空间,类似chroot的功能,之后在该客户端上的操作,都是基于该命名空间。
  • retryPolicy:重试策略。
  • connectionTimeoutMs:连接超时时间。
  • sessionTimeoutMsSession超时时间。

连接之后就可以通过Curator框架对ZooKeeper服务端进行操作了。Curator框架重试策略和Session API介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-26 08:55:06  更:2021-11-26 08:56:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 15:35:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码