Zookeeper的操作
1.客户端脚本
? 进入Zookeeper的bin目录之后,直接执行下面的命令:  ? 出现下面的输出信息,就表示已经成功连接上本地的zookeeper服务器了。  ? 默认连接本地的zookeeper服务器,要是想连接指定的zookeeper服务器,通过下面的命令实现。
zkCli.sh -server ip:port
(1)创建节点 ? 使用create命令创建节点,用法如下:
create [-s] [-e] path data acl
? 其中,-s或-e分别指定节点特性:顺序或临时节点,默认情况下创建的是持久节点。path为其在zookeeper根节点下的路径,data该节点数据内容,acl为其权限。
例:在zookeeper根节点下创建一个叫作/zk-book,节点数据内容为“123”的节点。 
(2)读取 ? 使用ls命令列出指定节点下的所有子结点,与linux的ls指令是类似的。用法如下:
ls path [watch]

? 使用get命令获取指定节点的数据内容,用法如下:
get path [watch]
 ? 使用stat命令查看节点的属性信息,用法如下:
stat path [watch]
 (3)更新 ? 使用set命令更新指定节点的数据内容,用法如下:
set path data [version] 
? 可以看到这里节点/zk-book的数据内容更新为了"456",dataVersion也变成了1.
(4)删除 ? 使用delete命令删除指定节点,用法如下:
delete path [version]

? 对于包含子节点的节点,该命令无法成功删除
(5)退出 ? 使用quit命令即可退出

2.Java客户端API使用
导入jar包
? 在使用Java操作zookeeper之前,需要在Idea中导入zookeeper的jar包。 ? 首先打开Idea,依次点击File - Project Structure…
 ? 点击Modules - Dependencies - JARs or Directories… - +号  ? 找到自己安装zookeeper的路径下的lib包,导入即可。 
(1)创建会话
? 客户端可以通过创建一个zookeeper实例来连接zookeeper服务器。zookeeper的4种构造方法如下:
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher); #connectString:值zookeeper服务器列表,形如host:port #sessionTimeout:会话的超时时间 #watcher:Watcher事件通知处理器
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly); #canBeReadOnly:标识当前会话是否支持只读模式
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd); #sessionId:会话ID #sessionPasswd:会话密钥
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly);
? 下面用Java来创建一个基本的zookeeper会话实例。
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class Simple_session implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("192.168.1.12:2181", 5000, new Simple_session());
System.out.println(zooKeeper.getState());
try{
connectedSemaphore.await();
}catch(InterruptedException e){}
System.out.println("ZooKeeper session established.");
}
public void process(WatchedEvent event){
System.out.println("Receive watched event:" + event);
if(Event.KeeperState.SyncConnected == event.getState()){
connectedSemaphore.countDown();
}
}
}
? 成功建立了一个简单的会话,程序的输出结果如下:

(2)创建节点
? 客户端可以通过ZooKeeper的API来创建一个数据节点,有下面两个接口:
同步方式创建节点 String create(final String path, byte data[ ], List< ACL> acl, CreateMode createMode); #path:需要创建的数据节点的结点路径 #data[ ]:节点创建后的初始内容 #acl:权限 #createMode:节点类型
异步方式创建节点 void create(final String path, byte data[ ], List< ACL> acl, CreateMode createMode, StringCallback cb, Object ctx); #cb:异步回调函数 #ctx:用于传递一个对象,可以在回调方法执行的时候使用,通常放一个上下文信息
? 上面两个接口中,有一个CreateMode,其是一个枚举类型。
enum createMode{
PERSISTENT,
PERSISTENT_SEQUENTIAL,
EPHEMERAL,
EPHEMERAL_SEQUENTIAL;
}
? 下面使用同步API来创建一个节点。
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
public class Create_node implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zooKeeper = new ZooKeeper("192.168.1.12:2181", 5000, new Create_node());
connectedSemaphore.await();
String path1 = zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("Success create znode: " + path1);
String path2 = zooKeeper.create("/zk-test-ephemeral-", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("Success create znode: " + path2);
}
public void process(WatchedEvent event){
if(Event.KeeperState.SyncConnected == event.getState()){
connectedSemaphore.countDown();
}
}
}

(3)删除节点
? 客户端可以通过ZooKeeper的API来删除一个节点,有下面两个接口:
同步删除 public void delete(final String path, int version);
异步删除 public void delete(final String path, int version, VoidCallback cb, Object ctx);
(4)读取数据
? 读取数据包括了子节点列表的获取和节点数据的获取,ZooKeeper提供了不同API来获取数据。
1.获取子结点 ? 使用getChildren来获取所有子结点,具体的有8个接口,这里就不一一列出。 ? 接口中的参数有一个很重要的东西是,注册Watcher。如果ZooKeeper客户端在获取到指定节点的子节点列表后,还需要订阅这个子节点列表的变化通知,那么就可以通过注册一个Watcher来实现。当有子节点被添加或者删除时,服务端就会向客户端发送一个NodeChildrenChanged类型的事件通知。在服务端发送给客户端的事件通知中,不包含最新的节点列表,客户端必须主动重新进行获取。 ? 使用同步API获取子节点列表。
public class GetChildren_test implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
String path = "/zk-book";
zk = new ZooKeeper("192.168.1.12:2181", 5000, new GetChildren_test());
connectedSemaphore.await();
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path+"/c1", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
List<String> childrenList = zk.getChildren(path, true);
System.out.println(childrenList);
zk.create(path+"/c2", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event){
if(Event.KeeperState.SyncConnected == event.getState()){
if(Event.EventType.None == event.getType() && null == event.getPath()){
connectedSemaphore.countDown();
}else if(event.getType() ==Event.EventType.NodeChildrenChanged){
try{
System.out.println("ReGetChild:" + zk.getChildren(event.getPath(), true));
}catch(Exception e){}
}
}
}
}
? 运行结果如下:  2.getData ? 客户端可以通过ZooKeeper的API来获取一个节点的数据内容,有4个接口。 ? 还是来看一下注册Watcher,获取数据内容时是可以进行Watcher注册的,节点状态发生变更,服务端会向客户端发送一个NodeDataChanged的事件通知。
public class GetData_test implements Watcher {
private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
private static ZooKeeper zk = null;
private static Stat stat = new Stat();
public static void main(String[] args) throws Exception {
String path = "/zk-book";
zk = new ZooKeeper("192.168.1.12:2181", 5000, new GetData_test());
connectedSemaphore.await();
zk.create(path, "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(new String(zk.getData(path, true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "" + stat.getVersion());
zk.setData(path, "123".getBytes(), -1);
Thread.sleep(Integer.MAX_VALUE);
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && null == event.getPath()) {
connectedSemaphore.countDown();
} else if (event.getType() == Event.EventType.NodeChildrenChanged) {
try {
System.out.println(new String(zk.getData(event.getPath(), true, stat)));
System.out.println(stat.getCzxid() + "," + stat.getMzxid() + "," + stat.getVersion());
} catch (Exception e) {
}
}
}
}
}
|