Zookeeper实现配置中心
一、环境准备
1 . 准备集群
搭建Zookeeper集群,我准备了四服务,分别是192.168.91.129,192.168.91.130, 192.168.91.131, 192.168.91.132,我本地用虚拟机开的四台机器,家里有条件的可以直接搞几台阿里云服务器,其实一台机器也能搞,推荐用三台服务器
2 . 导入依赖(客户端版本和Server端版本保持一致)
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
二、实现思路
1 . 创建Zookeeper对象(利用门栓保证zk可用) 2 . 判断配置中心的节点是否存在(使用回调的方式,需要传递 watch和statWatch),然后阻塞线程,直到获取配置完成 3 . 几种情况
- 第一个分支【没有配置文件节点】:如果没有配置文件节点,那需要在exists的Watch中对event时间Type中NodeCreated进行处理(一旦配置文件创建了就会回调这个地方),处理无非就是去get配置节点
- 第二个分支【有配置文件】:这个是正常流程用getData把节点数据读取回来(需要传递Watch监控后续更新)
- 第三个分支【配置文件更新】:getData的时候传递Watch对后续一些操作进行了监听,如果配置文件更新需要重新getData
- 第四个分支【配置节点删除】:这个地方有处理的方式比较宽泛,如果要求配置文件节点被删除了,配置一定也要同步删除,强一致要求配置和节点同步,那就需要清空配置文件,并且重新阻塞,重新getData,如果没有这种需求那这里就看着处理就行了(毕竟谁没事会删除配置文件呢)
三、代码实现
1 . Test测试
package org.example.configdemo2;
import org.apache.zookeeper.ZooKeeper;
import org.example.congfig.MyConf;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class TestMain {
private ZooKeeper zk;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
@Before
public void connection() {
try {
zk = new ZooKeeper("192.168.91.129,192.168.91.130,192.168.91.131,192.168.91.132/configTestdemo", 1000, event -> {
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
System.out.println("zk connection success !!");
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
default:
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@After
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void show() {
WatchAndCallback watchAndCal = new WatchAndCallback(zk);
MyConf conf = watchAndCal.getConf();
while (true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String config = conf.getConfig();
if(config == null || "".equals(config)){
conf = watchAndCal.getConf();
}
System.out.println(config);
}
}
}
2 . WatchAndCallback类(一个工具类)
package org.example.configdemo2;
import lombok.Data;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.example.congfig.MyConf;
import java.util.concurrent.CountDownLatch;
@Data
public class WatchAndCallback implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
private ZooKeeper zk;
private MyConf myConf = new MyConf();
public static final String PATH_NODE = "/cnf";
private CountDownLatch countDownLatch = new CountDownLatch(1);
public WatchAndCallback(ZooKeeper zk) {
this.zk = zk;
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
zk.getData(PATH_NODE, this, this, "get");
break;
case NodeDeleted:
myConf.setConfig("");
countDownLatch = new CountDownLatch(1);
System.out.println("配置删除,阻塞等待... ...");
zk.getData(PATH_NODE, this, this, "get");
break;
case NodeDataChanged:
zk.getData(PATH_NODE, this, this, "get");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
case PersistentWatchRemoved:
break;
default:
}
}
public MyConf getConf() {
zk.exists(PATH_NODE, this, this, "exists");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return myConf;
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (stat != null) {
zk.getData(PATH_NODE, this, this, "get");
}
}
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (stat != null) {
myConf.setConfig(new String(data));
countDownLatch.countDown();
}
}
}
四、测试结果
|