分布式配置

连接ZK集群
ZK 工具类(ZKUtils.java):
package org.garen.study.config;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZKUtils {
private static ZooKeeper zk;
private static String connectString = "192.168.174.62:2181,192.168.174.63:2181,192.168.174.64:2181,192.168.174.62:2181/testConf";
private static DefaultWatch defaultWatch = new DefaultWatch();
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static ZooKeeper getZK() {
try {
zk = new ZooKeeper(connectString, 1000, defaultWatch);
defaultWatch.setCountDownLatch(countDownLatch);
countDownLatch.await();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
return zk;
}
}
创建连接的watch(DefaultWatch.java)
package org.garen.study.config;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
public class DefaultWatch implements Watcher {
CountDownLatch countDownLatch;
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void process(WatchedEvent event) {
System.out.println("Connect zk cluster, create zk watch: " + event.toString());
switch (event.getState()) {
case Unknown:
break;
case Disconnected:
break;
case NoSyncConnected:
break;
case SyncConnected:
countDownLatch.countDown();
break;
case AuthFailed:
break;
case ConnectedReadOnly:
break;
case SaslAuthenticated:
break;
case Expired:
break;
case Closed:
break;
}
}
}
测试
测试类(TestConfig.java): 其中conn()方法,创建连接,返回连接对象,见前面“连接ZK集群”章节的代码
package org.garen.study.config;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestConfig {
ZooKeeper zk;
@Before
public void conn() {
zk = ZKUtils.getZK();
}
@After
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void getConf() {
MyConf myConf = new MyConf();
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
watchCallBack.setMyConf(myConf);
watchCallBack.awaitMyConf();
while (true) {
if(myConf.getConf() == null)
{
System.out.println("no conf ...");
watchCallBack.awaitMyConf();
}
else
{
System.out.println(myConf.getConf());
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
配置文件对象类(MyConf.java)
package org.garen.study.config;
public class MyConf {
private String conf;
public String getConf() {
return conf;
}
public void setConf(String conf) {
this.conf = conf;
}
}
回调方法工具类(WatchCallBack.java)
package org.garen.study.config;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.CountDownLatch;
public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {
ZooKeeper zk;
MyConf myConf;
CountDownLatch countDownLatch = new CountDownLatch(1);
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public void setMyConf(MyConf myConf) {
this.myConf = myConf;
}
public void awaitMyConf() {
zk.exists("/AppConf", this, this, "ABC");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
zk.getData("/AppConf", this, this, "BCD");
break;
case NodeDeleted:
myConf.setConf(null);
countDownLatch = new CountDownLatch(1);
break;
case NodeDataChanged:
zk.getData("/AppConf", this, this, "BCD");
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if(stat != null){
zk.getData("/AppConf", this, this, "BCD");
}
}
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if(data != null){
String s = new String(data);
myConf.setConf(s);
countDownLatch.countDown();
}
}
}
运行代码进行测试
分布式锁

连接ZK集群
ZK 工具类(ZKUtils.java): 延用“分布式配置”中的ZKUtils.java,连接ZK集群的字符串,父节点改成了“/testLock”。
private static String connectString = "192.168.174.62:2181,192.168.174.63:2181,192.168.174.64:2181,192.168.174.62:2181/testLock";
创建连接的watch(DefaultWatch.java) 延用“分布式配置”中的DefaultWatch.java,没有任何改变。
测试
测试类(TestLock.java): 其中conn()方法,创建连接,返回连接对象,见前面“连接ZK集群”章节的代码。
package org.garen.study.lock;
import org.apache.zookeeper.ZooKeeper;
import org.garen.study.config.ZKUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.stream.IntStream;
public class TestLock {
ZooKeeper zk;
@Before
public void conn() {
zk = ZKUtils.getZK();
}
@After
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testLock() {
IntStream.range(0, 10).forEach(i -> {
new Thread(() -> {
String threadName = Thread.currentThread().getName();
WatchCallBack watchCallBack = new WatchCallBack();
watchCallBack.setZk(zk);
watchCallBack.setThreadName(threadName);
watchCallBack.tryLock();
System.out.println(threadName + "working...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
watchCallBack.unLock();
}).start();
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
回调方法工具类(WatchCallBack.java)
package org.garen.study.lock;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
ZooKeeper zk;
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
String threadName;
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
CountDownLatch countDownLatch = new CountDownLatch(1);
String pathName;
public String getPathName() {
return pathName;
}
public void setPathName(String pathName) {
this.pathName = pathName;
}
public void tryLock() {
try {
System.out.println(threadName + " create....");
zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, this, "abc");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void unLock() {
try {
zk.delete(pathName, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
break;
case NodeCreated:
break;
case NodeDeleted:
zk.getChildren("/", false, this, "sdf");
break;
case NodeDataChanged:
break;
case NodeChildrenChanged:
break;
case DataWatchRemoved:
break;
case ChildWatchRemoved:
break;
}
}
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if(name != null){
System.out.println(threadName + "create node : " + name);
pathName = name;
zk.getChildren("/", false, this, "sdf");
}
}
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
Collections.sort(children);
int i = children.indexOf(pathName.substring(1));
if(i == 0) {
System.out.println(threadName + " i am first ...");
try {
zk.setData("/", threadName.getBytes(), -1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}else {
zk.exists("/" + children.get(i-1), this, this, "ggg");
}
}
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("last node exists : [" + "rc:" + rc + ", path: " + path + ", ctx: " + ctx + ", stat: " + stat + "]");
}
}
运行代码进行测试
-
运行前  -
运行结果
D:\GarenGosling\Java\jdk1.8.0_291\bin\java.exe -ea -Didea.test.cyclic.buffer.size=1048576 "-javaagent:D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\lib\idea_rt.jar=51228:D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\bin" -Dfile.encoding=UTF-8 -classpath "D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\lib\idea_rt.jar;D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\plugins\junit\lib\junit5-rt.jar;D:\GarenGosling\JetBrains\IntelliJ IDEA Educational Edition 2020.2.1\plugins\junit\lib\junit-rt.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\charsets.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\deploy.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\access-bridge-64.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\cldrdata.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\dnsns.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\jaccess.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\jfxrt.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\localedata.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\nashorn.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunec.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunjce_provider.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunmscapi.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\sunpkcs11.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\ext\zipfs.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\javaws.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jce.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jfr.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jfxswt.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\jsse.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\management-agent.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\plugin.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\resources.jar;D:\GarenGosling\Java\jdk1.8.0_291\jre\lib\rt.jar;D:\GarenGosling\workspace\idea_study\study-03\target\test-classes;D:\GarenGosling\workspace\idea_study\study-03\target\classes;D:\GarenGosling\apache-maven-3.6.3\repo\junit\junit\4.11\junit-4.11.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\hamcrest\hamcrest-core\1.3\hamcrest-core-1.3.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\apache\zookeeper\zookeeper\3.5.9\zookeeper-3.5.9.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\apache\zookeeper\zookeeper-jute\3.5.9\zookeeper-jute-3.5.9.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\apache\yetus\audience-annotations\0.5.0\audience-annotations-0.5.0.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-handler\4.1.50.Final\netty-handler-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-common\4.1.50.Final\netty-common-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-resolver\4.1.50.Final\netty-resolver-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-buffer\4.1.50.Final\netty-buffer-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-transport\4.1.50.Final\netty-transport-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-codec\4.1.50.Final\netty-codec-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-transport-native-epoll\4.1.50.Final\netty-transport-native-epoll-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\io\netty\netty-transport-native-unix-common\4.1.50.Final\netty-transport-native-unix-common-4.1.50.Final.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;D:\GarenGosling\apache-maven-3.6.3\repo\org\slf4j\slf4j-log4j12\1.7.25\slf4j-log4j12-1.7.25.jar;D:\GarenGosling\apache-maven-3.6.3\repo\log4j\log4j\1.2.17\log4j-1.2.17.jar" com.intellij.rt.junit.JUnitStarter -ideVersion5 -junit4 org.garen.study.lock.TestLock,testLock
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Connect zk cluster, create zk watch: WatchedEvent state:SyncConnected type:None path:null
Thread-1 create....
Thread-4 create....
Thread-5 create....
Thread-0 create....
Thread-9 create....
Thread-8 create....
Thread-7 create....
Thread-3 create....
Thread-2 create....
Thread-6 create....
Thread-3create node : /lock0000000030
Thread-4create node : /lock0000000031
Thread-7create node : /lock0000000032
Thread-6create node : /lock0000000033
Thread-8create node : /lock0000000034
Thread-0create node : /lock0000000035
Thread-5create node : /lock0000000036
Thread-2create node : /lock0000000037
Thread-9create node : /lock0000000038
Thread-1create node : /lock0000000039
Thread-3 i am first ...
Thread-3working...
last node exists : [rc:0, path: /lock0000000030, ctx: ggg, stat: 42949673243,42949673243,1627737835099,1627737835099,0,0,0,0,8,0,42949673243
]
last node exists : [rc:0, path: /lock0000000031, ctx: ggg, stat: 42949673244,42949673244,1627737835100,1627737835100,0,0,0,0,8,0,42949673244
]
last node exists : [rc:0, path: /lock0000000032, ctx: ggg, stat: 42949673245,42949673245,1627737835101,1627737835101,0,0,0,0,8,0,42949673245
]
last node exists : [rc:0, path: /lock0000000033, ctx: ggg, stat: 42949673246,42949673246,1627737835101,1627737835101,0,0,0,0,8,0,42949673246
]
last node exists : [rc:0, path: /lock0000000034, ctx: ggg, stat: 42949673247,42949673247,1627737835101,1627737835101,0,0,0,0,8,0,42949673247
]
last node exists : [rc:0, path: /lock0000000035, ctx: ggg, stat: 42949673248,42949673248,1627737835101,1627737835101,0,0,0,0,8,0,42949673248
]
last node exists : [rc:0, path: /lock0000000036, ctx: ggg, stat: 42949673249,42949673249,1627737835101,1627737835101,0,0,0,0,8,0,42949673249
]
last node exists : [rc:0, path: /lock0000000037, ctx: ggg, stat: 42949673250,42949673250,1627737835101,1627737835101,0,0,0,0,8,0,42949673250
]
last node exists : [rc:0, path: /lock0000000038, ctx: ggg, stat: 42949673251,42949673251,1627737835101,1627737835101,0,0,0,0,8,0,42949673251
]
Thread-4 i am first ...
Thread-4working...
Thread-7 i am first ...
Thread-7working...
Thread-6 i am first ...
Thread-6working...
Thread-8 i am first ...
Thread-8working...
Thread-0 i am first ...
Thread-0working...
Thread-5 i am first ...
Thread-5working...
Thread-2 i am first ...
Thread-2working...
Thread-9 i am first ...
Thread-9working...
Thread-1 i am first ...
Thread-1working...
Process finished with exit code -1
- 运行中
 - 运行后

至此,本专栏结束!
上一篇《13 Zookeeper原理知识,paxos、zab、角色功能、API开发基础》
|