zookeeper 客户端curator
简介
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。 注意:如果您希望将 Curator 与 ZooKeeper 3.4.x 一起使用,您应该固定到 Curator 的 4.2.x 版。 【官网】http://curator.apache.org/index.html
mvn依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.12.0</version>
</dependency>
静态工程启动curator
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory
.newClient(server.getConnectString(), 5000, 5000, retryPolicy);
client.start();
重试策略RetryPolicy
- ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
- RetryNTimes:指定最大重试次数的重试策略
- RetryOneTime:仅重试一次
- RetryUntilElapsed:一直重试直到达到规定的时间
第一个参数为baseSleepTimeMs初始的sleep时间,用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。
使用Fluent风格api创建
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy).build();
client.start();
创建节点
节点状态:
- PERSISTENT:持久化
- PERSISTENT_SEQUENTIAL:持久化并且带序列号
- EPHEMERAL:临时
- EPHEMERAL_SEQUENTIAL:临时并且带序列号
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath("/demo/t1", "t111".getBytes());
删除节点
client.delete()
.guaranteed()
.deletingChildrenIfNeeded()
.withVersion(10086)
.forPath("/demo");
读取节点
byte[] bytes = client.getData().forPath("/demo/t1");
System.out.println(new String(bytes));
Stat stat=new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath("/demo/t1");
System.out.println(stat.toString());
修改节点
client.setData().forPath("/demo/t1", "t222".getBytes());
事务
client.inTransaction()
以上为同步接口, curator还提供了异步接口
异步接口
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s", curatorEvent.getType(), curatorEvent.getResultCode()));
}, executor)
.forPath("/demo/t1");
spring集成方案
@Bean
public CuratorFramework curatorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(host)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectTimeout)
.retryPolicy(retryPolicy)
.namespace(namespace)
.build();
client.start();
return client;
}
@PreDestroy
private void destroyClient(){
curatorFramework().close();
log.info("==================关闭成功==================");
}
重试机制解决了但是会引发另一个问题,执行器zookeeper进程阻塞导致session超时会话断开,注册中心的执行器信息临时节点会丢失,重试机制启用重连策略,连接成功但是执行器信息已经不在zookeeper中了,为了解决这个问题引入客户端连接状态监听机制(即ConnectionStateListener使用)。有了解决方案在curator客户端中添加connection状态监听,当充实机制重连成功后,需要把执行器信息重新注册到注册中心zookeeper上。代码如下:
@Component
public class ZookeeperConnectionListener implements ConnectionStateListener {
private final Logger log = LoggerFactory.getLogger(ZookeeperConnectionListener.class);
@Autowired
private Environment environment;
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
if(connectionState == ConnectionState.RECONNECTED){
String port = environment.getProperty("server.port");
int hostPort = 0;
if (StringUtils.isNotBlank(port)) {
hostPort = Integer.valueOf(port);
}
ExcutorEntity excutorEntity = ExcutorHelper.getExcutorEntity(hostPort);
try {
Stat stat = curatorFramework.checkExists().forPath(ExcutorHelper.getPath(excutorEntity));
if(stat == null){
ExcutorHelper.registerExcutor(curatorFramework,hostPort);
}
} catch (Exception e) {
log.error("注册执行器节点失败:",e);
}
}
}
}
参考:Zookeeper客户端Curator使用详解
|