ZooKeeper典型应用场景总结
1.数据发布/订阅
? 数据发布/订阅系统,即配置中心,是发布者将数据发布到ZooKeeper的一个或一系列节点上,供订阅者进行数据订阅,进而达到动态获取数据的目的,实现配置信息的集中式管理和数据的动态更新。 ? Zookeeper实现数据发布/订阅系统采用这样一种设计模式:客户端向服务端注册自己需要关注的节点,当该节点发生数据变更时,服务端就会向相应的客户端发送Watcher事件通知,客户端接收到这个消息之后,需要主动到服务端获取最新的数据。

? 知道了原理,那接下来用zookeeper实现一个简单的发布/订阅系统。
? Publish的代码如下:
public class Publish implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static Stat stat = new Stat();
private static ZooKeeper zk = null;
public static void main(String[] args) {
try {
String path1 = "/number";
String path2 = "/character";
zk = new ZooKeeper("192.168.1.8:2181", 5000, new Publish());
countDownLatch.await();
System.out.println("zookeeper connection");
zk.create(path1, "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create(path2, "a".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("init number : " + new String(zk.getData(path1, true, stat)));
System.out.println("init character : " + new String(zk.getData(path2, true, stat)));
int i = 0;
while (true) {
System.out.println("publish new number:" + i);
zk.setData(path1, String.valueOf(i).getBytes(), -1);
char c = (char) (i + 97);
System.out.println("publish new character:" + c);
zk.setData(path2, ("" + c).getBytes(), -1);
Thread.sleep(5000);
i++;
}
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
System.out.println("receive watched event:" + event);
System.out.println(event.getState());
countDownLatch.countDown();
}
}
}
? Subscribe的代码如下:
public class Subscribe_num implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static Stat stat = new Stat();
private static ZooKeeper zk = null;
public static void main(String[] args) {
try {
String path = "/number";
zk = new ZooKeeper("192.168.1.8:2181", 5000, new Subscribe_num());
countDownLatch.await();
System.out.println("zookeeper connection");
System.out.println("init number : number is " + new String(zk.getData(path, true, stat)));
while (true) {
Thread.sleep(Integer.MAX_VALUE);
}
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && event.getPath() == null) {
countDownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println("path:" + event.getPath() + "\tdata has changed.\t new character :" + new String(zk.getData(event.getPath(), true, stat)));
} catch (Exception e) {
}
}
}
}
}
public class Subscribe_char implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static Stat stat = new Stat();
private static ZooKeeper zk = null;
public static void main(String[] args) {
try {
String path = "/character";
zk = new ZooKeeper("192.168.1.8:2181", 5000, new Subscribe_char());
countDownLatch.await();
System.out.println("zookeeper connection");
System.out.println("init character : character is " + new String((zk.getData(path, true, stat))));
while (true) {
Thread.sleep(Integer.MAX_VALUE);
}
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && event.getPath() == null) {
countDownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println("path:" + event.getPath() + "\tdata has changed.\t new character :" + new String(zk.getData(event.getPath(), true, stat)));
} catch (Exception e) {
}
}
}
}
}
? Publish的运行结果如下:  ? Subscribe_num的运行结果如下:
 ? Subscribe_character的运行结果如下: 
2.DNS服务
? DNS是域名系统(Domain Name System)的缩写,可以将DNS系统看作是一个超大规模的分布式映射表,用于将域名和IP地址进行一一映射,方便人们通过域名来访问互联网站点。 ? 用ZooKeeper可以实现一种动态的DNS服务,具体如下。
(1)域名配置
? 对域名的配置就相当于在zookeeper上一级一级增加节点。例如:/DDNS/app1/server.app1.company1.com。每个应用都可以创建一个属于自己的数据节点作为域名配置的根节点,例如/DDNS/app1,在这个节点上,每个应用都可以将自己的域名配置上去。 
(2)域名解析
? 传统的DNS解析中,不需要关心域名的解析过程,所有这些工作都交由操作系统的域名和IP地址映射机制。在DDNS中,域名的解析过程都是由每一个应用自己负责。 ? 应用首先从域名节点中获取一份IP地址和端口的配置,进行自行解析。同时,每个应用还会从域名节点上注册一个数据变更Watcher监听,以便及时收到域名变更的通知。
(3)域名变更
? 当域名对应的IP地址或是端口变更,这个时候就需要进行域名变更操作。在DDNS中,只需要对指定的域名节点进行更新操作,zookeeper就会向订阅它的客户端发送这个事件通知,应用在接收到这个事件通知后,会再次进行域名配置的获取。
? 接下来将上述这一DDNS系统简单的实现一下。
Server的代码如下:
public class Server implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.1.8:2181", 5000, new Server());
countDownLatch.await();
System.out.println("zookeeper connection");
String path1 = "/DDNS/app1";
String path2 = "/DDNS/app2";
String path3 = "/DDNS/app3";
String path4 = "/DDNS/app4";
zooKeeper.create("/DDNS", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path1, "192.168.1.1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path2, "192.168.1.2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path3, "192.168.1.3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path4, "192.168.1.4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path1 + "/server.app1.company1.com", "192.168.1.1:8041".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path4 + "/server.app4.company1.com", "192.168.1.4:8081".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zooKeeper.create(path4 + "/server.app4.company2.com", "192.168.1.4:8182".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Thread.sleep(20000);
zooKeeper.setData(path1 + "/server.app1.company1.com", "192.168.1.7:8141".getBytes(), -1);
zooKeeper.setData(path4 + "/server.app4.company2.com", "192.168.1.4:8148".getBytes(), -1);
Thread.sleep(20000);
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
System.out.println("receive watched event:" + event);
System.out.println(event.getState());
countDownLatch.countDown();
}
}
}
App的代码如下:
public class App1_company1 implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.1.8:2181", 5000, new App1_company1());
countDownLatch.await();
String path = "/DDNS/app1/server.app1.company1.com";
System.out.println("domain name is : " + path + "\t" + "analysis to : " + new String(zooKeeper.getData(path, true, null)));
Thread.sleep(20000);
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && event.getPath() == null) {
countDownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println(event.getPath() + " is already changed. Please get new IP or Port!");
System.out.println("domain name is : " + event.getPath() + "\t" + "analysis to : " + new String(zooKeeper.getData(event.getPath(), true, null)));
} catch (Exception e) {
}
}
}
}
}
public class App4_company1 implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.1.8:2181", 5000, new App4_company1());
countDownLatch.await();
String path = "/DDNS/app4/server.app4.company1.com";
System.out.println("domain name is : " + path + "\t" + "analysis to : " + new String(zooKeeper.getData(path, true, null)));
Thread.sleep(20000);
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && event.getPath() == null) {
countDownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println(event.getPath() + " is already changed. Please get new IP or Port!");
System.out.println("domain name is : " + event.getPath() + "\t" + "analysis to : " + new String(zooKeeper.getData(event.getPath(), true, null)));
} catch (Exception e) {
}
}
}
}
}
public class App4_company2 implements Watcher {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("192.168.1.8:2181", 5000, new App4_company2());
countDownLatch.await();
String path = "/DDNS/app4/server.app4.company2.com";
System.out.println("domain name is : " + path + "\t" + "analysis to : " + new String(zooKeeper.getData(path, true, null)));
Thread.sleep(20000);
} catch (Exception e) {
}
}
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected == event.getState()) {
if (Event.EventType.None == event.getType() && event.getPath() == null) {
countDownLatch.countDown();
} else if (event.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println(event.getPath() + " is already changed. Please get new IP or Port!");
System.out.println("domain name is : " + event.getPath() + "\t" + "analysis to : " + new String(zooKeeper.getData(event.getPath(), true, null)));
} catch (Exception e) {
}
}
}
}
}
? App1_company1是对应域名的IP地址发生了变化,其执行结果如下:  ? App4_company1对应域名未发生任何变化,其执行结果如下:  ? App4_company2对应域名的端口号发生了变化,其执行结果如下: 
|