IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> zookeeper 客户端curator -> 正文阅读

[大数据]zookeeper 客户端curator

zookeeper 客户端curator

简介

Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。
注意:如果您希望将 Curator 与 ZooKeeper 3.4.x 一起使用,您应该固定到 Curator 的 4.2.x 版。
【官网】http://curator.apache.org/index.html

mvn依赖

<!-- 对zookeeper的底层api的一些封装 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
<!-- 测试客户端,可用不启用zookeeper -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.12.0</version>
</dependency>

静态工程启动curator

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//使用测试用
final TestingServer server = new TestingServer();
CuratorFramework client = CuratorFrameworkFactory
        .newClient(server.getConnectString(), 5000, 5000, retryPolicy);
//启动客户端
client.start();

重试策略RetryPolicy

  • ExponentialBackoffRetry:重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
  • RetryNTimes:指定最大重试次数的重试策略
  • RetryOneTime:仅重试一次
  • RetryUntilElapsed:一直重试直到达到规定的时间

第一个参数为baseSleepTimeMs初始的sleep时间,用于计算之后的每次重试的sleep时间。第二个参数为maxRetries,最大重试次数。

使用Fluent风格api创建

CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString(server.getConnectString())
        .sessionTimeoutMs(5000)
        .connectionTimeoutMs(5000)
        .retryPolicy(retryPolicy).build();
client.start();

创建节点

节点状态:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带序列号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:临时并且带序列号
client.create()
        //如果父节点不存在则创建
        .creatingParentContainersIfNeeded()
        //节点类型
        .withMode(CreateMode.PERSISTENT).forPath("/demo/t1", "t111".getBytes());

删除节点

client.delete()
                .guaranteed() //强制删除 在后台持续进行删除操作,直到删除节点成功。
                .deletingChildrenIfNeeded() // 递归删除子节点
                .withVersion(10086) // 指定删除的版本号
                .forPath("/demo");

读取节点

byte[] bytes = client.getData().forPath("/demo/t1");
System.out.println(new String(bytes));
//读取state
Stat stat=new Stat();
byte[] bytes = client.getData().storingStatIn(stat).forPath("/demo/t1");
System.out.println(stat.toString());

修改节点

client.setData().forPath("/demo/t1", "t222".getBytes());

事务

client.inTransaction()

以上为同步接口, curator还提供了异步接口

异步接口

Executor executor = Executors.newFixedThreadPool(2);
client.create()
        .creatingParentsIfNeeded()
        .withMode(CreateMode.EPHEMERAL)
        .inBackground((curatorFramework, curatorEvent) -> {
            System.out.println(String.format("eventType:%s,resultCode:%s", curatorEvent.getType(), curatorEvent.getResultCode()));
        }, executor)
        .forPath("/demo/t1");

spring集成方案

@Bean
public CuratorFramework curatorFramework(){
    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
    CuratorFramework client =
            CuratorFrameworkFactory.builder()
                    .connectString(host)
                    .sessionTimeoutMs(sessionTimeout)
                    .connectionTimeoutMs(connectTimeout)
                    .retryPolicy(retryPolicy)
                    .namespace(namespace)
                    .build();
    client.start();
    return client;
}
@PreDestroy
private void destroyClient(){
    curatorFramework().close();
    log.info("==================关闭成功==================");
}

重试机制解决了但是会引发另一个问题,执行器zookeeper进程阻塞导致session超时会话断开,注册中心的执行器信息临时节点会丢失,重试机制启用重连策略,连接成功但是执行器信息已经不在zookeeper中了,为了解决这个问题引入客户端连接状态监听机制(即ConnectionStateListener使用)。有了解决方案在curator客户端中添加connection状态监听,当充实机制重连成功后,需要把执行器信息重新注册到注册中心zookeeper上。代码如下:

@Component
public class ZookeeperConnectionListener implements ConnectionStateListener {
    private final Logger log = LoggerFactory.getLogger(ZookeeperConnectionListener.class);
    @Autowired
    private Environment environment;
    @Override
    public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
        if(connectionState == ConnectionState.RECONNECTED){ //监控重新连接的状态,补偿执行器注册
            String port = environment.getProperty("server.port"); //获取执行器端口
            int hostPort = 0;
            if (StringUtils.isNotBlank(port)) {
                hostPort = Integer.valueOf(port);
            }
            ExcutorEntity excutorEntity = ExcutorHelper.getExcutorEntity(hostPort); //构造执行器注册信息
            try {
                Stat stat = curatorFramework.checkExists().forPath(ExcutorHelper.getPath(excutorEntity)); //检查执行器信息状态
                if(stat == null){
                    ExcutorHelper.registerExcutor(curatorFramework,hostPort); //重新注册执行器信息
                }
            } catch (Exception e) {
                log.error("注册执行器节点失败:",e);
                //重试
            }
        }
    }
}

参考:Zookeeper客户端Curator使用详解

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-18 12:46:21  更:2021-08-18 12:47:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 20:04:57-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码