相关命令
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
get [-s] [-w] path
[-s] 返回携带元信息 [-w] 添加一个Watcher,一次性的
实现思路
- 利用zookeeper的持久化节点,保存数据
- 针对节点,添加watcher,监听 NodeDataChanged 事件
考虑点
- 每个节点的保存的数据大小有上限,默认为1M,可以通过设置java启动变量进行变更(要先变动服务端、再变动客户端)
org.apache.jute.BinaryInputArchive#maxBuffer
- watcher监听在3.6.0版本之前是一次性,而3.6.0版本之后可以添加持续性的watcher
public void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
throws KeeperException, InterruptedException {
- 如果客户端与服务端存在网络波动,还是会出现服务端的watcher被触发之后,通知客户端失败(队列里已经没有这个事件了),所以客户端最好建立定时主动查询机制
org.apache.zookeeper.ClientCnxn.EventThread#run
- 如果要配合权限,就在创建节点的时候,设置好 ACL
代码示例实现
@Bean
public ZooKeeper zookeeper(ApplicationContext applicationContext) throws IOException, KeeperException, InterruptedException {
ZKClientConfig zkClientConfig = new ZKClientConfig();
ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 15000, new CustomConnectWatcher(), zkClientConfig);
zooKeeper.addWatch("/zk_test", new CustomNodeWatcher370(zooKeeper, applicationContext), AddWatchMode.PERSISTENT_RECURSIVE);
return zooKeeper;
}
@Bean
public ApplicationConfigProvider applicationConfigProvider(ZooKeeper zookeeper) throws KeeperException, InterruptedException {
ApplicationConfigProvider applicationConfigProvider = new ApplicationConfigProvider();
byte[] data = zookeeper.getData("/zk_test", false, null);
applicationConfigProvider.refresh(new String(data));
return applicationConfigProvider;
}
@Slf4j
@Component
public class ConsoleOutput {
@Autowired
ApplicationConfigProvider applicationConfigProvider;
@Scheduled(fixedDelay = 5 * 1000L, initialDelay = 5 * 1000L)
public void print() {
log.info("目前配置为 :{}", applicationConfigProvider.getMysqlPassword());
}
}
@Slf4j
public class CustomNodeWatcher370 implements Watcher {
private ZooKeeper zooKeeper;
private ApplicationContext applicationContext;
public CustomNodeWatcher370(ZooKeeper zooKeeper, ApplicationContext applicationContext) {
this.zooKeeper = zooKeeper;
this.applicationContext = applicationContext;
}
@SneakyThrows
@Override
public void process(WatchedEvent event) {
if (Objects.equals(event.getType(), Event.EventType.NodeDataChanged)) {
byte[] data = zooKeeper.getData(event.getPath(), false, null);
applicationContext.getBean(ApplicationConfigProvider.class).refresh(new String(data));
}
}
}
@Slf4j
public class CustomNodeWatcher implements Watcher {
private ZooKeeper zooKeeper;
private ApplicationConfigProvider applicationConfigProvider;
public CustomNodeWatcher(ZooKeeper zooKeeper, ApplicationConfigProvider applicationConfigProvider) {
this.zooKeeper = zooKeeper;
this.applicationConfigProvider = applicationConfigProvider;
}
@SneakyThrows
@Override
public void process(WatchedEvent event) {
if (Objects.equals(event.getType(), Event.EventType.NodeDataChanged)) {
byte[] data = zooKeeper.getData(event.getPath(), new CustomNodeWatcher(zooKeeper, applicationConfigProvider), null);
applicationConfigProvider.refresh(new String(data));
}
}
}
原命令操作实例
- 我们看到目前节点(/zk_test) 的数据为 “香槟不好喝”,数据版本号为 13
 - 我们在 get 命令里,同时添加了一个 watcher
- 重新设置节点数据,发现 watcher 被触发

|