IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Zookeeper客户端使用与集群特性详解 -> 正文阅读

[大数据]Zookeeper客户端使用与集群特性详解

一、Zookeeper Java 客户端

1、项目构建

zookeeper 官方的客户端没有和服务端代码分离,他们为同一个jar 文件,所以我们直接引入zookeeper的maven即可, 这里版本请保持与服务端版本一致,不然会有很多兼容性的问题。


    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.8</version>
        </dependency>
    </dependencies>
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ZkApplication {
    public static void main(String[] args) {
        SpringApplication.run(ZkApplication.class, args);
    }
}

2、创建客户端实例:ZK配置中心实战

我们这里模拟一个zk设计的配置中心:

配置类:

import lombok.*;

@Data
@ToString
@NoArgsConstructor
@Builder
@AllArgsConstructor
public class MyConfig {
    private String key;
    private String name;
}

配置中心类:

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.*;
import org.jihu.config.MyConfig;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ConfigCenter {
    private static final String CONNECT_STR = "192.168.131.171";

    private static final Integer SESSION_TIMEOUT = 30 * 1000;

    private static ZooKeeper zooKeeper = null;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        /**
         * Zookeeper连接成功后会触发一个事件,状态是SyncConnected,类型为None
         *  WatchedEvent state:SyncConnected type:None path:null,
         *
         *  所以我们只要监听到这个事件就说明连接成功了
         */
        // 注意,zooKeeper的建立默认是异步的,会启动其他线程去建立
        /**
         *  public void start() {
         *         sendThread.start();
         *         eventThread.start();
         *     }
         *
         *  这里的sendThread和eventThread都是守护线程,如果没有业务线程,就会退出。所以如果我们直接就这样创建,
         *  可能我们的连接还没建立起来,整个程序就结束了
         *
         *  这里我们需要使用CountDownLatch
         */
        zooKeeper = new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 根据事件类型判断是否连接成功
                if (event != null && event.getType().equals(Watcher.Event.EventType.None)
                        && event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                    log.info("zk 连接成功...");
                    // 当连接建立好之后
                    countDownLatch.countDown();
                }
            }
        });

        // 阻塞直到count == 0
        countDownLatch.await();

        // 执行到此处,表明连接已经建立好了

        // ======== 将配置存储到zk中 ==========

        MyConfig config = MyConfig.builder().key("anyKey").name("anyName").build();

        ObjectMapper objectMapper = new ObjectMapper();
        byte[] configBytes = objectMapper.writeValueAsBytes(config);

        // 创建zk节点
        final String configPath = "/myconfig";
        try {
            String result = zooKeeper.create(configPath, configBytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            log.error("{}", e);
        }


        /**
         * 循环监听
         */
        Watcher watcher = new Watcher() {
            @SneakyThrows
            @Override
            public void process(WatchedEvent event) {
                // 获取到监听事件之后
                // 判断路径是否是配置路径,并且事件类型是NodeDataChanged
                if (configPath.equals(event.getPath())
                        && Watcher.Event.EventType.NodeDataChanged.equals(event.getType())) {
                    log.info("PATH: {}, 发生了数据变化.", event.getType());
                    // 获取数据并添加监听
                    byte[] data = zooKeeper.getData(configPath, this, null);
                    log.info("数据发生变化: {}", objectMapper.readValue(new String(data), MyConfig.class));
                }
            }
        };

        // 获取数据并添加监听
        byte[] oldData = zooKeeper.getData(configPath, watcher, null);
        MyConfig oldConfig = objectMapper.readValue(new String(oldData), MyConfig.class);
        log.info("原始数据:{}", oldConfig);

        // 不要让程序结束. 然后我们在客户端中去修改数据
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

注意:上面代码书写的时候,有一些需要注意的地方。大家需要仔细看看代码中的注释!!!


然后我们启动,项目,查看日志,可以看到,zk已经连接成功了,也成功设置和读取了配置文件。
在这里插入图片描述
我们来查看一下zk,发现数据也已经成功添加了。
在这里插入图片描述
下面我们来验证监听,我们在zk客户端中修改数据,看看日志中是否可以监听到:

在这里插入图片描述
在这里插入图片描述
可以看到,java客户端已经成功监听到数据的变化,并读取数据和添加了新的监听,完成了循环监听。

3、创建Zookeeper实例的方法

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, ZKClientConfig)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, HostProvider, ZKClientConfig)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly, ZKClientConfig)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[])
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long, byte[], boolean, HostProvider, ZKClientConfig)
ZooKeeper(String connectString, int  sessionTimeout, Watcher watcher, long, byte[], boolean)

在这里插入图片描述

4、常用操作

为了方便演示,我们使用Junit进行测试:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.jihu.config.MyConfig;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ZkTest1 {

    private static final String CONNECT_STR = "192.168.131.171";

    private static final Integer SESSION_TIMEOUT = 30 * 1000;

    private static ZooKeeper zooKeeper = null;

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    private static ObjectMapper objectMapper = new ObjectMapper();

    // 初始化zk
    @Before
    public void init() throws IOException {
        log.info("try to connect to zk server");
        zooKeeper = new ZooKeeper(CONNECT_STR, SESSION_TIMEOUT, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 根据事件类型判断是否连接成功
                if (event != null && event.getType().equals(Watcher.Event.EventType.None)
                        && event.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                    // 当连接建立好之后
                    countDownLatch.countDown();
                    log.info("zk 连接成功...");
                }
            }
        });
    }


    /**
     * 循环监听
     */
    private Watcher createDataChangeWatch(String path) {
        return new Watcher() {
            @SneakyThrows
            @Override
            public void process(WatchedEvent event) {
                // 获取到监听事件之后
                // 判断路径是否是配置路径,并且事件类型是NodeDataChanged
                if (path.equals(event.getPath())
                        && Watcher.Event.EventType.NodeDataChanged.equals(event.getType())) {
                    log.info("PATH: {}, 发生了数据变化.", event.getType());
                    // 获取数据并添加监听
                    byte[] data = zooKeeper.getData(path, this, null);
                    log.info("数据发生变化: {}", objectMapper.readValue(new String(data), MyConfig.class));
                }
            }
        };
    }

   // ============== 测试方法开始 ================


   // ===============测试方法结束 ==============

	// 不要让程序结束
    @After
    public void holdOnzk() throws InterruptedException {
        // 不要让程序结束. 然后我们在客户端中去修改数据
        TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
    }
}

1、同步创建节点:

@Test
public void createTest() throws KeeperException, InterruptedException {
    String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    log.info("created path: {}",path);
}

2、异步创建节点:

public void create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode, StringCallback cb, Object ctx)

StringCallback 是回调。

ctx是回调函数关联的对象,可以传进去。

@Test
    public void createAsycTest() throws InterruptedException {
        String zkNode = "/t_createAsyc";
        zooKeeper.create(zkNode, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT, (rc, path, ctx, name) -> {
                    String threadName = Thread.currentThread().getName();
                    log.info("threadName:{}, rc  {},path {},ctx {},name {}",threadName, rc,path,ctx,name);

                }, "context");

        log.info("over");
    }

在这里插入图片描述
从执行顺序来看,主线程的log打印出来的时候,异步线程还在进行中。

3、修改节点数据:

@Test
public void setTest() throws KeeperException, InterruptedException {

    Stat stat = new Stat();
    byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
    log.info("修改前: {}",new String(data));
    zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
     byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
    log.info("修改后: {}",new String(dataAfter));
}

4、乐观锁修改数据

我们在设置数据的时候,每一次都会将dataversion进行递增,所以我们可以使用这个字段来实现乐观锁修改数据:

@Test
    public void optimisticUpdate() {
        String zkNode = "/t_optimisticUpdate";
        // 先创建一个节点
        try {
            zooKeeper.create(zkNode, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (KeeperException | InterruptedException e) {
            log.error("节点创建失败: {}", e);
        }
        // 获取节点信息包括version
        Stat stat = new Stat();
        try {
            byte[] data = zooKeeper.getData(zkNode, false, stat);
        } catch (KeeperException | InterruptedException e) {
            log.error("节点获取失败: {}", e);
        }

        int version1 = stat.getVersion();
        log.info("第一次的版本号:{}", version1);

        // 乐观锁修改
        try {
            Stat stat1 = zooKeeper.setData(zkNode, "data2".getBytes(), version1);
            log.info("第一次修改成功.版本号:{}", version1);
        } catch (KeeperException | InterruptedException e) {
            log.error("修改节点失败. version {}: {}", version1, e.getMessage());
        }
        // 乐观锁修改2, 此时传入旧的版本号,会修改失败
        try {
            Stat stat2 = zooKeeper.setData(zkNode, "data3".getBytes(), version1);
        } catch (KeeperException | InterruptedException e) {
            log.error("修改节点失败. version {}: {}", version1, e.getMessage());
        }

        byte[] data1 = new byte[0];
        try {
            data1 = zooKeeper.getData(zkNode, false, stat);
            int version2 = stat.getVersion();
            log.info("当前数据数据是:{}. version: {}", new String(data1), version2);
        } catch (KeeperException | InterruptedException e) {
            log.error("节点获取失败: {}", e);
        }
    }

在这里插入图片描述
可以看到,如果版本号不正确的话,就会修改失败!而且每一次成功修改之后,版本号都会递增:
在这里插入图片描述
注意:多个线程同时修改数据,在zk的Server端会被放到一个队列中同步进行,所以不会存在并发问题。

5、删除节点

删除节点的时候也需要一个版本号,但是如果我们传-1的话,就会直接删除。

@Test
    public void testDelete() {
        String zkNode = "/t_optimisticUpdate";
        try {
            zooKeeper.delete(zkNode, -1);
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

二、 Curator

Curator是zk的客户端框架。

1、什么是Curator

Curator 是一套由netflix 公司开源的,Java 语言编程的 ZooKeeper 客户端框架,Curator项目是现在ZooKeeper 客户端中使用最多,对ZooKeeper 版本支持最好的第三方客户端,并推荐使用,Curator 把我们平时常用的很多 ZooKeeper 服务开发功能做了封装,例如 Leader 选举、分布式计数器、分布式锁。

这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。在会话重新连接、Watch 反复注册、多种异常处理等使用场景中,用原生的 ZooKeeper 处理比较复杂。而在使用 Curator 时,由于其对这些功能都做了高度的封装,使用起来更加简单,不但减少了开发时间,而且增强了程序的可靠性。

三、Curator 实战

这里我们以 Maven 工程为例,首先要引入Curator 框架相关的开发包,这里为了方便测试引入了junit ,lombok,由于Zookeeper本身以来了 log4j 日志框架,所以这里可以创建对应的log4j配置文件后直接使用。

如下面的代码所示,我们通过将 Curator 相关的引用包配置到 Maven 工程的 pom 文件中,将 Curaotr 框架引用到工程项目里,在配置文件中分别引用了两个 Curator 相关的包,第一个是 curator-framework 包,该包是对 ZooKeeper 底层 API 的一些封装。另一个是 curator-recipes 包,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.11.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.8</version>
        </dependency>

        <!-- 装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 该包是对 ZooKeeper 底层 API 的一些封装 -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.0.0</version>
        </dependency>
    </dependencies>

1、会话创建

要进行客户端服务器交互,第一步就要创建会话。

Curator 提供了多种方式创建会话,比如用静态工厂方式创建:

// 重试策略 
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

或者使用 fluent 风格创建:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("192.168.128.129:2181")
                .sessionTimeoutMs(5000)  // 会话超时时间
                .connectionTimeoutMs(5000) // 连接超时时间
                .retryPolicy(retryPolicy)
                .namespace("base") // 包含隔离名称
                .build();
client.start();

这段代码的编码风格采用了流式方式,最核心的类是 CuratorFramework 类,该类的作用是定义一个 ZooKeeper 客户端对象,并在之后的上下文中使用。在定义 CuratorFramework 对象实例的时候,我们使用了 CuratorFrameworkFactory 工厂方法,并指定了 connectionString 服务器地址列表、retryPolicy 重试策略 、sessionTimeoutMs 会话超时时间、connectionTimeoutMs 会话创建超时时间。下面我们分别对这几个参数进行讲解:

  • connectionString:服务器地址列表,在指定服务器地址列表的时候可以是一个地址,也可以是多个地址。如果是多个地址,那么每个服务器地址列表用逗号分隔, 如 host1:port1,host2:port2,host3;port3 。
  • retryPolicy:重试策略,当客户端异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 ZooKeeper 服务端。而 Curator 提供了 一次重试、多次重试等不同种类的实现方式。在 Curator 内部,可以通过判断服务器返回的 keeperException 的状态代码来判断是否进行重试处理,如果返回的是 OK 表示一切操作都没有问题,而 SYSTEMERROR 表示系统或服务端错误。
    在这里插入图片描述
  • 超时时间:Curator 客户端创建过程中,有两个超时时间的设置。一个是 sessionTimeoutMs 会话超时时间,用来设置该条会话在 ZooKeeper 服务端的失效时间。另一个是 connectionTimeoutMs 客户端创建会话的超时时间,用来限制客户端发起一个会话连接到接收 ZooKeeper 服务端应答的时间。sessionTimeoutMs 作用在服务端,而 connectionTimeoutMs 作用在客户端。

2、创建节点

创建节点的方式如下面的代码所示,描述一个节点要包括节点的类型,即临时节点还是持久节点、节点的数据信息、节点是否是有序节点等属性和性质。

 @Test
public void testCreate() throws Exception {
    String path = curatorFramework.create().forPath("/curator-node");
    curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/curator-node","some-data".getBytes())
    log.info("curator create node :{}  successfully.",path);
}

在 Curator 中,可以使用 create 函数创建数据节点,并通过 withMode 函数指定节点类型(持久化节点,临时节点,顺序节点,临时顺序节点,持久化顺序节点等),默认是持久化节点,之后调用 forPath 函数来指定节点的路径和数据信息。

一次性创建带层级结构的节点

@Test
public void testCreateWithParent() throws Exception {
    String pathWithParent="/node-parent/sub-node-1";
    String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
    log.info("curator create node :{}  successfully.",path);
}

3、获取数据

@Test
public void testGetData() throws Exception {
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
    log.info("get data from  node :{}  successfully.",new String(bytes));
}

4、更新节点

我们通过客户端实例的 setData() 方法更新 ZooKeeper 服务上的数据节点,在setData 方法的后边,通过 forPath 函数来指定更新的数据节点路径以及要更新的数据。

@Test
public void testSetData() throws Exception {
    curatorFramework.setData().forPath("/curator-node","changed!".getBytes());
    byte[] bytes = curatorFramework.getData().forPath("/curator-node");
    log.info("get data from  node /curator-node :{}  successfully.",new String(bytes));
}

5、删除节点

@Test
public void testDelete() throws Exception {
    String pathWithParent="/node-parent";
    curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}
  • guaranteed:该函数的功能如字面意思一样,主要起到一个保障删除成功的作用,其底层工作方式是:只要该客户端的会话有效,就会在后台持续发起删除请求,直到该数据节点在ZooKeeper 服务端被删除。
  • deletingChildrenIfNeeded:指定了该函数后,系统在删除该数据节点的时候会以递归的方式直接删除其子节点,以及子节点的子节点。

6、异步接口

Curator 引入了BackgroundCallback 接口,用来处理服务器端返回来的信息,这个处理过程是在异步线程中调用,默认在 EventThread 中调用,也可以自定义线程池。

public interface BackgroundCallback
{
    /**
     * Called when the async background operation completes
     *
     * @param client the client
     * @param event operation result details
     * @throws Exception errors
     */
    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

如上接口,主要参数为 client 客户端, 和 服务端事件 event .

inBackground 异步处理默认在EventThread中执行.

@Test
public void test() throws Exception {
    curatorFramework.getData().inBackground((item1, item2) -> {
        log.info(" background: {}", item2);
    }).forPath(ZK_NODE);

    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

7、指定线程池

@Test
public void test() throws Exception {
    ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    curatorFramework.getData().inBackground((item1, item2) -> {
        log.info(" background: {}", item2);
    },executorService).forPath(ZK_NODE);

    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

8、Curator 监听器:

/**
 * Receives notifications about errors and background events
 */
public interface CuratorListener
{
    /**
     * Called when a background task has completed or a watch has triggered
     *
     * @param client client
     * @param event the event
     * @throws Exception any errors
     */
    public void         eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

针对 background 通知和错误通知。使用此监听器之后,调用inBackground 方法会异步获得监听。

四、Curator Caches

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件监听,Cache 事件监听可以理解为一个本地缓存视图与远程 Zookeeper 视图的对比过程。Cache 提供了反复注册的功能。

Cache 分为两类注册类型:节点监听和子节点监听。

1、node cache

NodeCache 对某一个节点进行监听:

public NodeCache(CuratorFramework client,
                         String path)
Parameters:
client - the client
path - path to cache

可以通过注册监听器来实现,对当前节点数据变化的处理。

public void addListener(NodeCacheListener listener)
     Add a change listener
Parameters:
listener - the listener
@Slf4j
public class NodeCacheTest extends AbstractCuratorTest{

    public static final String NODE_CACHE="/node-cache";

    @Test
    public void testNodeCacheTest() throws Exception {

        createIfNeed(NODE_CACHE);
        NodeCache nodeCache = new NodeCache(curatorFramework, NODE_CACHE);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                log.info("{} path nodeChanged: ",NODE_CACHE);
                printNodeData();
            }
        });

        nodeCache.start();
    }


    public void printNodeData() throws Exception {
        byte[] bytes = curatorFramework.getData().forPath(NODE_CACHE);
        log.info("data: {}",new String(bytes));
    }
}

2、path cache

PathChildrenCache 会对子节点进行监听,但是不会对二级子节点进行监听。

public PathChildrenCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat

可以通过注册监听器来实现,对当前节点的子节点数据变化的处理。

public void addListener(PathChildrenCacheListener listener)
     Add a change listener
Parameters:
listener - the listener
@Slf4j
public class PathCacheTest extends AbstractCuratorTest{

    public static final String PATH="/path-cache";

    @Test
    public void testPathCache() throws Exception {

        createIfNeed(PATH);
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, PATH, true);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                log.info("event:  {}",event);
            }
        });

        // 如果设置为true则在首次启动时就会缓存节点内容到Cache中
        pathChildrenCache.start(true);
    }
}

3、tree cache

TreeCache 使用一个内部类TreeNode来维护这个一个树结构。并将这个树结构与ZK节点进行了映射。所以TreeCache 可以监听当前节点下所有节点的事件。

public TreeCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat

可以通过注册监听器来实现,对当前节点的子节点,及递归子节点数据变化的处理。

public void addListener(TreeCacheListener listener)
     Add a change listener
Parameters:
listener - the listener
@Slf4j
public class TreeCacheTest extends AbstractCuratorTest{

    public static final String TREE_CACHE="/tree-path";

    @Test
    public void testTreeCache() throws Exception {
        createIfNeed(TREE_CACHE);
        TreeCache treeCache = new TreeCache(curatorFramework, TREE_CACHE);
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                log.info(" tree cache: {}",event);
            }
        });
        treeCache.start();
    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-13 17:31:50  更:2021-07-13 17:32:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/1 14:46:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码