ZooKeeper : Curator框架重试策略和Session API介绍
在学习Curator 框架API 之前,可以先了解Java 客户端原生API ,这样不仅可以更好的理解Curator 框架API ,还可以突出Curator 框架的方便和强大。
Curator 是一个比较完善的ZooKeeper 客户端框架,通过封装的一套高级API 简化了ZooKeeper 的操作。Curator 框架主要解决了三类问题:
- 封装
ZooKeeper Client 与ZooKeeper Server 之间的连接处理(提供连接重试机制等)。 - 提供了一套
Fluent 风格的API ,并且在Java 客户端原生API 的基础上进行了增强(创捷节点、删除节点等)。 - 提供
ZooKeeper 各种应用场景(分布式锁、leader 选举、共享计数器、分布式队列等)的抽象封装。
博主将Curator 框架API 分为Session 、Znode 、ACL 、Watcher 和Transaction 这几个部分来进行介绍,限于篇幅原因,本篇博客只介绍Session API 以及其中的重试策略,之后的博客会介绍其他API 的使用。博主使用的Curator 框架版本是5.2.0 ,ZooKeeper 版本是3.6.3 。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
重试策略
在Java 客户端原生API 中,客户端与服务端的连接是没有提供连接重试机制的,如果客户端需要重连,就只能将上一次连接的Session ID 与Session Password 发送给服务端进行重连。而Curator 框架提供了客户端与服务端的连接重试机制,并且可以通过Fluent 风格的API 来给连接添加重试策略。
RetryPolicy 接口是重试策略的抽象,allowRetry 方法用来判断是否允许重试。
package org.apache.curator;
import org.apache.zookeeper.KeeperException;
public interface RetryPolicy
{
boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
default boolean allowRetry(Throwable exception)
{
if ( exception instanceof KeeperException)
{
final int rc = ((KeeperException) exception).code().intValue();
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
(rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue());
}
return false;
}
}
RetryPolicy 接口的关系图如下所示: SleepingRetry 抽象类:
package org.apache.curator.retry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import java.util.concurrent.TimeUnit;
abstract class SleepingRetry implements RetryPolicy
{
private final int n;
protected SleepingRetry(int n)
{
this.n = n;
}
public int getN()
{
return n;
}
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( retryCount < n )
{
try
{
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return false;
}
return true;
}
return false;
}
protected abstract long getSleepTimeMs(int retryCount, long elapsedTimeMs);
}
重试n 次(有次数限制),重试之前先进行睡眠,睡眠时间由getSleepTimeMs 方法得到。
SessionFailedRetryPolicy 类:
package org.apache.curator;
import org.apache.zookeeper.KeeperException;
public class SessionFailedRetryPolicy implements RetryPolicy
{
private final RetryPolicy delegatePolicy;
public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
{
this.delegatePolicy = delegatePolicy;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
}
@Override
public boolean allowRetry(Throwable exception)
{
if ( exception instanceof KeeperException.SessionExpiredException )
{
return false;
}
else
{
return delegatePolicy.allowRetry(exception);
}
}
}
这里只是增加了对SessionExpiredException 这种异常的判断,当遇到Session 过期异常时,不再进行重连,即返回false 。而其他的所有业务全部委托给delegatePolicy 实例。因为RetryPolicy 接口的allowRetry(Throwable exception) 方法有默认实现:
default boolean allowRetry(Throwable exception)
{
if ( exception instanceof KeeperException)
{
final int rc = ((KeeperException) exception).code().intValue();
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
(rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue());
}
return false;
}
当遇到Session 过期异常时,允许进行重连(rc == KeeperException.Code.SESSIONEXPIRED.intValue() )。SessionFailedRetryPolicy 这种重连策略用的不多,这里就不详细介绍了。
RetryForever 类:
package org.apache.curator.retry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
public class RetryForever implements RetryPolicy
{
private static final Logger log = LoggerFactory.getLogger(RetryForever.class);
private final int retryIntervalMs;
public RetryForever(int retryIntervalMs)
{
checkArgument(retryIntervalMs > 0);
this.retryIntervalMs = retryIntervalMs;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
try
{
sleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.warn("Error occurred while sleeping", e);
return false;
}
return true;
}
}
重试没有次数限制,一般也不常用。
RetryNTimes 类(继承SleepingRetry 抽象类):
package org.apache.curator.retry;
public class RetryNTimes extends SleepingRetry
{
private final int sleepMsBetweenRetries;
public RetryNTimes(int n, int sleepMsBetweenRetries)
{
super(n);
this.sleepMsBetweenRetries = sleepMsBetweenRetries;
}
@Override
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
return sleepMsBetweenRetries;
}
}
继承了SleepingRetry 抽象类,没有重写allowRetry 方法,因此也是重试n 次,实现getSleepTimeMs 方法,改方法返回重试之间的睡眠时间。
RetryOneTime 类(继承了RetryNTimes 类):
public class RetryOneTime extends RetryNTimes
{
public RetryOneTime(int sleepMsBetweenRetry)
{
super(1, sleepMsBetweenRetry);
}
}
只重试一次的重试策略。
ExponentialBackoffRetry 类(继承了SleepingRetry 抽象类):
package org.apache.curator.retry;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
public class ExponentialBackoffRetry extends SleepingRetry
{
private static final Logger log = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
private static final int MAX_RETRIES_LIMIT = 29;
private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
private final Random random = new Random();
private final int baseSleepTimeMs;
private final int maxSleepMs;
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
{
this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
}
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
{
super(validateMaxRetries(maxRetries));
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxSleepMs = maxSleepMs;
}
@VisibleForTesting
public int getBaseSleepTimeMs()
{
return baseSleepTimeMs;
}
@Override
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
if ( sleepMs > maxSleepMs )
{
log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
sleepMs = maxSleepMs;
}
return sleepMs;
}
private static int validateMaxRetries(int maxRetries)
{
if ( maxRetries > MAX_RETRIES_LIMIT )
{
log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
maxRetries = MAX_RETRIES_LIMIT;
}
return maxRetries;
}
}
重试maxRetries 次,maxRetries 如果大于最大重试次数限制(MAX_RETRIES_LIMIT ),即大于29 ,maxRetries 就会被MAX_RETRIES_LIMIT 覆盖,否则不改变maxRetries 的大小。有意思的是getSleepTimeMs 方法获取的重试之间的睡眠时间并不是不变的,而是随机得到的,并且随着重试次数的增加,睡眠时间的随机范围不断扩大(右边界不断扩大),如果随机得到的睡眠时间超过maxSleepMs (如果没有被指定,默认为DEFAULT_MAX_SLEEP_MS , 即 Integer.MAX_VALUE ),会被maxSleepMs 覆盖,而随机得到的睡眠时间小于baseSleepTimeMs ,也会被baseSleepTimeMs 覆盖。
package org.apache.curator.retry;
import com.google.common.annotations.VisibleForTesting;
public class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry
{
private final int maxSleepTimeMs;
public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries)
{
super(baseSleepTimeMs, maxRetries);
this.maxSleepTimeMs = maxSleepTimeMs;
}
@VisibleForTesting
public int getMaxSleepTimeMs()
{
return maxSleepTimeMs;
}
@Override
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
return Math.min(maxSleepTimeMs, super.getSleepTimeMs(retryCount, elapsedTimeMs));
}
}
maxSleepTimeMs 属性和maxSleepMs 属性作用是一样的,都是为了限制睡眠时间的最大取值。只是maxSleepTimeMs 属性不会提供默认值,因此必须要指定它的大小。个人感觉maxSleepTimeMs 属性完全可以不需要。
连接
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.*;
public class Application{
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int TIMEOUT = 40000;
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(SERVER_PROXY)
.namespace("curator")
.retryPolicy(retryPolicy)
.connectionTimeoutMs(TIMEOUT)
.sessionTimeoutMs(TIMEOUT)
.build();
curator.start();
if (curator.getState().equals(CuratorFrameworkState.STARTED)) {
System.out.println("连接成功!");
}
}
}
输出:
连接成功!
这就是Fluent 风格的API 。
connectString :ZooKeeper 服务端的地址。namespace :命名空间,类似chroot 的功能,之后在该客户端上的操作,都是基于该命名空间。retryPolicy :重试策略。connectionTimeoutMs :连接超时时间。sessionTimeoutMs :Session 超时时间。
连接之后就可以通过Curator 框架对ZooKeeper 服务端进行操作了。Curator 框架重试策略和Session API 介绍就到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
|