可以看到Zookeeper的源码中总共提供了四个构造函数,参数从三个到六个不等,我们分别来看看这几个参数的说明:
connectString
connectString代表Zookeeper的服务器列表,每个zk服务器使用host:port字符串组成,多个zk服务器之间使用英文输入法的逗号分隔,需要注意的是我们还可以在连接中指定根目录,例如
192.168.1.3 :2181/zk-root,则当客户端连接上当前zk服务器的时候,后续所有的操作都会基于当前目录作为根目录进行操作
sessionTimeout
sessionTimeout指的是会话时间,单位是毫秒,zk引入了会话的概念,在一个会话周期内,zk会通过心跳检测机制来维持会话的有效性,一旦在当前指定的时间内没有完成心跳检测,则视为当前会话已经失效
watcher
watcher是zk中提供的事件通知机制,在zk中允许传入当前类型的接口实例来作为客户端默认的事件通知处理器,如果我们当前不需要引入事件通知机制,当前参数传递null即可
canBeReadOnly
canBeReadOnly是zk中提供的一个保护机制,默认情况下zk集群如果出现大半的机器失去网络连接(宕机),那么zk将进入保护,不再处理任何客户端请求(包括读),但是如果我们需要继续提供读服务操作,则需要将当前的参数设置为true
aHostProvider
aHostProvider是zk提供的支持客户端的自定义行为操作,内部提供next、onConnected以及updateServerList等操作
clientConfig
clientConfig是zk提供的用来灵活配置的配置类,可以将连接的信息配置进去,例如zk超时时间,连接方式,连接信息等
了解了Zookeeper的构造函数,我们来编写第一个demo,创建一个zk会话实例:
/**
- 会话连接demo
*/ public class SessionDemo implements Watcher { private static CountDownLatch latch = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException { ZooKeeper zooKeeper = new ZooKeeper(“127.0.0.1:2181”,5000,new SessionDemo()); System.out.println(zooKeeper.getState()); latch.await(); }
@Override public void process(WatchedEvent event) { System.out.println(“触发了监听事件:”+event); //如果当前是异步连接状态 if(Event.KeeperState.SyncConnected == event.getState()){ latch.countDown(); } } }
点击运行以后,可以看到控制台的输出:
CONNECTING 触发了监听事件:WatchedEvent state:SyncConnected type:None path:null
创建节点
zookeeper提供了两种创建节点的方式,一种是同步创建,一种是异步,我们首先来看看create方法:
//创建同步节点 public String create(final String path, byte data[], List acl,CreateMode createMode)throws KeeperException, InterruptedException
//创建异步节点 public void create(final String path, byte data[], List acl,CreateMode createMode, StringCallback cb, Object ctx)
可以看到异步创建节点的接口多了两个参数,一个是callback回调,一个是传递的object对象,接下来我们针对这些参数进行一下说明:
path
path是创建zk节点的时候指定的对应路径,如:/root
data[]
**data[]**是创建节点的时候对应存储的内容
acl
acl是创建节点的时候的策略
createMode
createMode是创建节点的时候指定的节点类型,通常指定四种类型(实际不止):
1.持久化节点–PERSISTENT
2.持久化顺序节点–PERSISTENT _ S E QUENTIAL
3.临时节点–EPHEMERAL
4.临时顺序节点-- EPHEMERAL _ S EQUENTIAL
cb
cb是注册一个异步回调函数,类型为StringCallback,一般重写void processResult( int rc , String path , Object ctx , String n ame ) ;当zk创建节点完毕以后,会自动调用这个方法,可以在当前方法内处理对应的业务逻辑
ctx
ctx在创建节点的时候可以传递的对象,在StringCallback的回调函数执行的时候使用(传递),通常会传递上下文(Context)
注意:这里需要注意的是,zk默认创建的时候不会给我们进行序列化,需要我们手动序列化转换为字节数组传递节点存储的内容,并且zk默认不允许跨节点创建,即如果上层路径不存在的情况下,直接创建子节点,并且如果当前路径对应节点已经创建,再去创建并不会覆盖原来的内容,会抛出NodeExistsException异常
创建同步节点
接下来,我们来创建一个同步节点:
/** *创建同步节点demo **/ public class CreateDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”, 5000,new CreateDemo()); LATCH.await(); //等待连接上zk以后 String path1 = zk.create("/test01", “test01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建无序节点:”+path1);
String path2 = zk.create("/test02", “test02”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(“创建有序节点:”+path2); }
@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }
我们来运行一下,可以看到控制台输出的内容:
创建无序节点:/test01 创建有序节点:/test020000000002
创建异步节点
创建异步节点,我们需要注意传递ctx参数,以及StringCallback的实例,并且我们需要注意的是,create方法并不会阻塞后续业务执行:
/** *创建异步节点demo **/ public class CreateAyncDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”, 5000,new CreateAyncDemo()); LATCH.await(); //等待连接上zk以后 zk.create("/testAsyn001", “testAsyn001”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new IStringCallback(),“我是上下文”);
zk.create("/testAsyn002", “testAsyn002”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new IStringCallback(),“我是上下文”); Thread.sleep(Integer.MAX_VALUE); }
@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } }
} class IStringCallback implements AsyncCallback.StringCallback{ @Override public void processResult(int rc, String path, Object ctx, String name) { System.out.println(“创建节点完毕的回调函数:[rc:”+rc+",path:"+path+",ctx:"+ctx+",name:"+name+"]"); } }
我们使用Thread.sleep阻塞一下当前的业务,运行可以看到控制台成功输出:
创建节点完毕的回调函数:[rc:0,path:/testAsyn001,ctx:我是上下文,name:/testAsyn001] 创建节点完毕的回调函数:[rc:0,path:/testAsyn002,ctx:我是上下文,name:/testAsyn0020000000009]
删除节点
客户端通过delete方法来删除某一个节点,delete方法如下:
//同步删除 public void delete(final String path, int version) throws InterruptedException, KeeperException //异步删除 public void delete(final String path, int version, VoidCallback cb,Object ctx)
可以看到删除节点操作和创建节点一样,支持同步和异步操作,但是这里我们需要注意的是,**在Zookeeper中,只运行按照顺序删除节点,即子节点存在无法直接删除父节点,必须优先删除下面的所有子节点!**接下来我们编写一个案例,删除节点:
public class DeleteDemo implements Watcher { private stat
《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
【docs.qq.com/doc/DSkNLaERkbnFoS0ZF】 完整内容开源分享
ic final CountDownLatch LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”, 5000,new DeleteDemo()); LATCH.await(); //等待连接上zk以后 String path1 = zk.create("/test01", “test01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建无序节点:”+path1);
String path2 = zk.create("/test02", “test02”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(“创建有序节点:”+path2); //触发同步删除 zk.delete("/test01",0); //触发异步删除 zk.delete("/test02",0,new IVoidCallback(),“我是触发异步删除的数据”); Thread.sleep(Integer.MAX_VALUE); }
@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }
class IVoidCallback implements AsyncCallback.VoidCallback{
@Override public void processResult(int rc, String path, Object ctx) { System.out.println(“异步删除节点触发:[path:”+path+",rc:"+rc+",ctx:"+ctx+"]"); } }
运行以后,可以看到控制台的输出如下:
创建无序节点:/test01 创建有序节点:/test020000000011 异步删除节点触发:[path:/test02,rc:-101,ctx:我是触发异步删除的数据]
获取数据
在Zookeeper中有两种获取数据的接口,一种是获取当前节点的相关数据信息,一种是获取当前节点下的子节点相关的数据信息,接下来我们来看看这两种Api接口
getData
客户端通过getData方法可以获取当前节点的相关数据内容,方法如下:
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException //第二个 public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException //第三个 public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx) //第四个 public void getData(String path, boolean watch, DataCallback cb, Object ctx)
可见这里也提供了同步获取数据和异步获取数据的方法,接下来我们来看看对应参数的说明:
path
path是需要获取数据的节点所在的路径
watcher
watcher是用来注册的通知接口,如果节点发生变更,就会通过当前的接口通知客户端
stat
stat是用来描述该数据节点的状态信息,如果我们需要获取当前节点的状态信息,可以在客户端创建该变量传入方法中,有服务端替换该变量
cb
cb是异步获取数据的时候注册的回调函数接口,类型为DataCallback
ctx
ctx是在异步获取数据过程中传递给异步回调函数接口的实例,一般用来传递上下文
接下来,我们来编写代码获取一下节点的内容数据:
public class GetDataDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, KeeperException, InterruptedException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”,5 * 1000, new GetDataDemo()); LATCH.await(); String path = zk.create("/testGetData01", “/testGetData01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建节点成功:”+path); //同步调用查询接口 Stat stat = new Stat(); byte[] data = zk.getData("/testGetData01", true, stat); System.out.println(“同步获取当前节点内容:”+new String(data)+",stat:"+stat); //异步调用查询接口 zk.getData("/testGetData01", true,new IDataCallback(),“我是异步获取数据的回调函数”); Thread.sleep(5 * 1000); }
@Override public void process(WatchedEvent event) { if(Event.KeeperState.SyncConnected == event.getState()){ LATCH.countDown(); } } }
class IDataCallback implements AsyncCallback.DataCallback{
@Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { System.out.println(“异步获取节点数据:[path”+path+",ctx:"+ctx+",rc:"+rc+",data:"+new String(data)+","+data.toString()+",stat:"+stat+"]"); } }
运行以后,控制台的输出如下:
创建节点成功:/testGetData01 同步获取当前节点内容:/testGetData01,stat:40,40,1579166587745,1579166587745,0,0,0,72059329994162178,14,0,40
异步获取节点数据:[path/testGetData01,ctx:我是异步获取数据的回调函数,rc:0,data:/testGetData01,[B@22987e0b,stat:40,40,1579166587745,1579166587745,0,0,0,72059329994162178,14,0,40 ]
getChildren
如果当前客户端想要获取当前节点下的子节点的内容数据,就需要使用getChildren方法,方法的定义如下:
public List getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException //第二个 public List getChildren(String path, boolean watch)throws KeeperException, InterruptedException //第三个 public void getChildren(final String path, Watcher watcher,ChildrenCallback cb, Object ctx) //第四个 public void getChildren(String path, boolean watch, ChildrenCallback cb,Object ctx) //第五个 public List getChildren(final String path, Watcher watcher,Stat stat) hrows KeeperException, InterruptedException //第六个 public List getChildren(String path, boolean watch, Stat stat) hrows KeeperException, InterruptedException //第七个 public void getChildren(final String path, Watcher watcher,Children2Callback cb, Object ctx) //第八个 public void getChildren(String path, boolean watch, Children2Callback cb,Object ctx)
可见与getData等Api一样,都提供了多个场景下使用的api接口,并且都有同步异步两种Api的使用,而这里的参数前面都有介绍,可以参照上面的参数说明。接下来,我们来看看同步获取子节点数据与异步Api的使用:
public class GetChildrenDemo implements Watcher { private static final CountDownLatch LATCH = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException { ZooKeeper zk = new ZooKeeper(“127.0.0.1:2181”,5 * 1000, new GetChildrenDemo()); LATCH.await(); String path = zk.create("/testGetChildren01", “/testGetChildren01”.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println(“创建节点成功:”+path); //同步调用查询接口 Stat stat = new Stat(); List data = zk.getChildren("/testGetChildren01", true); System.out.println(“同步获取当前节点下的子节点:”+ data);
|