public class ZookeeperAPITest {
/**
* 测试 zookeeper,创建永久节点
* @throws Exception
*/
@Test
public void createZnode() throws Exception {
/**
* 定制一个重试策略
* param1:重试的时间间隔
* param2:重试的最大次数
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
/**
* 获取一个客户端对象
* param1:要连接的Zookeeper服务器列表
* param2:会话的超时时间
* param3:链接超时时间
* param4:重试策略
*/
String connectionStr = "192.168.139.201:2181,192.168.139.202:2181,192.168.139.203:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
/**
* 开启客户端
*/
client.start();
/**
* 创建永久节点
*/
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/hello2","world".getBytes());
/**
* 关闭客户端
*/
client.close();
}
/**
* 创建临时节点
* @throws Exception
*/
@Test
public void createTmpZnode() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);
String connectionStr = "192.168.139.201:2181,192.168.139.202:2181,192.168.139.203:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
client.start();
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/hello5","world".getBytes());
Thread.sleep(5000);
client.close();
}
/**
* 修改节点数据
* @throws Exception
*/
@Test
public void setZnodeData() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
String connectionStr = "192.168.139.201:2181,192.168.139.202:2181,192.168.139.203:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
client.start();
client.setData().forPath("/hello","haha".getBytes());
client.close();
}
/**
* 查询节点数据
* @throws Exception
*/
@Test
public void getZnodeData() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1);
String connectionStr = "192.168.139.201:2181,192.168.139.202:2181,192.168.139.203:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
client.start();
byte[] bytes = client.getData().forPath("/hello");
System.out.println(new String(bytes));
client.close();
}
/**
* zookeeper 的 watch 机制
* @throws Exception
*/
@Test
public void watchZnode() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1);
String connectionStr = "192.168.139.201:2181,192.168.139.202:2181,192.168.139.203:2181";
CuratorFramework client = CuratorFrameworkFactory.newClient(connectionStr, 8000, 8000, retryPolicy);
client.start();
/**
* 创建一个TreeCache对象,指定要监控的节点路径
*/
TreeCache treeCache = new TreeCache(client, "/hello");
/**
* 自定义一个监听器
*/
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
ChildData data = treeCacheEvent.getData();
if (data != null) {
switch (treeCacheEvent.getType()) {
case NODE_ADDED:
System.out.println("监测到有新增节点");
break;
case NODE_REMOVED:
System.out.println("监测到有节点被移除");
break;
case NODE_UPDATED:
System.out.println("监测到节点被更新");
break;
default:
break;
}
}
}
});
/**
* 开始监听
*/
treeCache.start();
Thread.sleep(100000);
client.close();
}
}
|