Zookeeper
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
zookeeper=文件系统+监听通知机制。
基本使用:
将软件包移动到Linux中去
输入解压命令解压
tar -zxvf zookeeper
将文件改名为zookeeper
进入zookeeper/conf中,将zoo_sample.cfg 更名为 zoo.cfg
然后进入zookeeper/bin目录中就可以使用zookeeper的启动命令了
zookeeper支持很多特定的命令来和它交互,主要是用来查询zookeeper的信息 。
-stat 查看状态信息
-ruok 查看zookeeper是否启动
-dump 列出没有处理的节点,临时节点
-conf 查看服务器配置
-cons 显示连接到服务端的信息
-envi 显示环境变量信息
-mntr 查看zk的健康信息
-wchs 展示watch的信息
-wchc和wchp 显示session的watch信息 path的watch信息
白名单
如果在使用四字命令时出现下面这个提示,说明该命令不在zookeeper的白名单里。
例如输入 echo ruok|nc 127.0.0.1 2181出现以下提示:
找到conf目录下的zoo.cfg 在最后一行添加 :4lw.commands.whitelist=* 保存退出,如果zookeeper服务开启需要重启之后使用。
zoo.cfg解读
tickTime
tickTime:通信心跳数,Zookeeper服务器心跳时间,单位毫秒
Zookeeper使用的基本时间, 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳,时间单位为毫秒。
它用于心跳机制,并且设置最小的session超时时间为两倍心跳时间.(session的最小超时时间是2*tickTime。)
initLimit
这个配置项是用来配置Zookeeper接收Follower客户端(这里所说的客户端不是用户链接Zookeeper服务器的客户端,而是Zookeeper服务器集群中连接到leader的Follower服务器,Follower在启动过程中,会从Leader同步所有最新数据,然后确定自己能够对外服务的起始状态。Leader允许Follower在 initLimit 时间内完成这个工作)初始化连接是最长能忍受多少个心跳的时间间隔数。
当已经超过10个心跳的时间(也就是tickTime)长度后Zookeeper服务器还没有收到客户端返回的信息,那么表明这个客户端连接失败。总的时间长度就是10*2000=20秒
syncLimit
syncLimit:LF同步通信时限
集群中Leader与Follower之间的最大响应时间单位。
在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态, 假如响应超过syncLimit * tickTime(假设syncLimit=5 ,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。),Leader认为Follwer死掉,从服务器列表中删除Follwer。
在运行过程中,Leader负责与ZK集群中所有机器进行通信,例如通过一些心跳检测机制,来检测机器的存活状态。
如果L发出心跳包在syncLimit之后,还没有从F那收到响应,那么就认为这个F已经不在线了。
dataDir
dataDir:数据文件目录+数据持久化路径
保存内存数据库快照信息的位置,如果没有其他说明,更新的事务日志也保存到数据库。
clientPort
clientPort:客户端连接端口
监听客户端连接的端口。
数据模型/znode节点深入
Znode的数据模型
是什么
Znode维护了一个stat结构,这个stat包含数据变化的版本号、访问控制列表变化、还有时间戳。版本号和时间戳一起,可让Zookeeper验证缓存和协调更新。每次znode的数据发生了变化,版本号就增加。
例如,无论何时客户端检索数据,它也一起检索数据的版本号。并且当客户端执行更新或删除时,客户端必须提供他正在改变的znode的版本号。如果它提供的版本号和真实的数据版本号不一致,更新将会失败。
ZooKeeper的Stat结构体
stat | 解释 |
---|
czxid | 引起这个znode创建的zxid,创建节点的事务的zxid(ZooKeeper Transaction Id) | ctime | 被创建的毫秒数(从1970年开始) | mzxid | znode最后更新的zxid | mtime | znode最后修改的毫秒数(从1970年开始) | pZxid | znode最后更新的子节点zxid | cversion | znode子节点变化号,znode子节点修改次数 | dataversion | znode数据变化号 | aclVersion | znode访问控制列表的变化号 | ephemeralOwner | 如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0。 | dataLength | znode的数据长度 | numChildren | znode子节点数量 |
总结
zookeeper内部维护了一套类似UNIX的树形数据结构:由znode构成的集合,
znode的集合又是一个树形结构, 每一个znode又有很多属性进行描述。 Znode = path + data + Stat
基础命令和Java客户端操作
zkCli的常用命令操作
一句话:和redis的KV键值对类似,只不过key变成了一个路径节点值,v就是data
Zookeeper表现为一个分层的文件系统目录树结构 不同于文件系统之处在于:zk节点可以有自己的数据,而unix文件系统中的目录节点只有子节点
常用命令
四字命令
Java客户端操作
1、需要引入ZooKeeper的maven依赖
例如:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BzkVUDR0-1638242311848)(img/Snipaste_2021-10-08_13-05-31.png)]
2、可以配合log4j输入日志信息,需要配置log4j.xml配置文件
3、Code
package com.wjj;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class ZooKeeperTest {
public static final String CONNECTSTRING = "192.168.190.131:2181";
public static final String PATH = "/test";
public static final int SESSION_TIME = 20 * 1000;
public static void main(String[] args) throws Exception {
ZooKeeperTest zooKeeperTest = new ZooKeeperTest();
ZooKeeper zooKeeper = zooKeeperTest.startZK();
if (zooKeeper.exists(PATH, false) == null) {
zooKeeperTest.createZNode(zooKeeper,PATH,"testValue1");
String zNode = zooKeeperTest.getZNode(zooKeeper, PATH);
System.out.println("ZNode =" + zNode);
} else {
System.out.println("this zNode is created!");
}
zooKeeperTest.stopZK(zooKeeper);
}
public ZooKeeper startZK() throws IOException {
return new ZooKeeper(CONNECTSTRING, SESSION_TIME, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
}
public void createZNode(ZooKeeper zk, String path, String data) throws Exception {
if (zk == null) {
throw new NullPointerException();
}
zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public String getZNode(ZooKeeper zk, String path) throws InterruptedException, KeeperException {
String result = null;
if (zk != null) {
byte[] data = zk.getData(path, false, new Stat());
result = new String(data);
}
return result;
}
public void stopZK(ZooKeeper zk) throws InterruptedException {
if (zk != null) {
zk.close();
}
}
}
通知机制
watch
通知机制
客户端注册监听它关心的目录节点, 当目录节点发生变化(数据改变、被删除、子目录节点增加删除)时, zookeeper会通知客户端。
是什么
观察者的功能
ZooKeeper 支持watch(观察)的概念。客户端可以在每个znode结点上设置一个观察。如果被观察服务端的znode结点有变更,那么watch就会被触发,这个watch所属的客户端将接收到一个通知包被告知结点已经发生变化,把相应的事件通知给设置过Watcher的Client端。
Zookeeper里的所有读取操作:getData(),getChildren()和exists()都有设置watch的选项
一句话:异步回调的触发机制
watch事件理解
一次触发
当数据有了变化时zkserver向客户端发送一个watch,它是一次性的动作,即触发一次就不再有效,类似一次性纸杯。
只监控一次
如果想继续Watch的话,需要客户端重新设置Watcher。因此如果你得到一个watch事件且想在将来的变化得到通知,必须新设置另一个watch。
发往客户端
Watches是异步发往客户端的,Zookeeper提供一个顺序保证:在看到watch事件之前绝不会看到变化,这样不同客户端看到的是一致性的顺序。
在(导致观察事件被触发的)修改操作的成功返回码到达客户端之前,事件可能在去往客户端的路上,但是可能不会到达客户端。观察事件是异步地发送给观察者(客户端)的。ZooKeeper会保证次序:在收到观察事件之前,客户端不会看到已经为之设置观察的节点的改动。网络延迟或者其他因素可能会让不同的客户端在不同的时间收到观察事件和更新操作的返回码。这里的要点是:不同客户端看到的事情都有一致的次序。
为数据设置watch
节点有不同的改动方式。可以认为ZooKeeper维护两个观察列表:数据观察和子节点观察。getData()和exists()设置数据观察。getChildren()设置子节点观察。此外,还可以认为不同的返回数据有不同的观察。getData()和exists()返回节点的数据,而getChildren()返回子节点列表。所以,setData()将为znode触发数据观察。成功的create()将为新创建的节点触发数据观察,为其父节点触发子节点观察。成功的delete()将会为被删除的节点触发数据观察以及子节点观察(因为节点不能再有子节点了),为其父节点触发子节点观察。
观察维护在客户端连接到的ZooKeeper服务器中。这让观察的设置、维护和分发是轻量级的。客户端连接到新的服务器时,所有会话事件将被触发。同服务器断开连接期间不会收到观察。客户端重新连接 时,如果需要,先前已经注册的观察将被重新注册和触发。通常这都是透明的。有一种情况下观察事件将丢失:对还没有创建的节点设置存在观察,而在断开连接期 间创建节点,然后删除。
时序性和一致性
Watches是在client连接到Zookeeper服务端的本地维护,这可让watches成为轻量的,可维护的和派发的。当一个client连接到新server,watch将会触发任何session事件,断开连接后不能接收到。当客户端重连,先前注册的watches将会被重新注册并触发。
关于watches,Zookeeper维护这些保证: (1)Watches和其他事件、watches和异步恢复都是有序的。Zookeeper客户端保证每件事都是有序派发 (2)客户端在看到新数据之前先看到watch事件 (3)对应更新顺序的watches事件顺序由Zookeeper服务所见
代码实现
一次性
package com.wjj;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class WatchOne {
private static final String CONNECTSTRING = "192.168.190.131:2181";
private static final String PATH = "/testOne";
private static final int SESSION_TIMEOUT = 50 * 1000;
private ZooKeeper zk = null;
public ZooKeeper startZK() throws IOException {
return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
public void stopZK() throws InterruptedException {
if (zk != null) {
zk.close();
}
}
public void createZNode(String path, String nodeValue) throws KeeperException, InterruptedException {
zk.create(path, nodeValue.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public String getZNode(String path) throws KeeperException, InterruptedException {
byte[] byteArray = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
triggerValue(path);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}, new Stat());
return new String(byteArray);
}
public String triggerValue(String path) throws KeeperException, InterruptedException {
byte[] byteArray = zk.getData(path, false, new Stat());
String retValue = new String(byteArray);
System.out.println("**************triggerValue: " + retValue);
return retValue;
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
WatchOne watchOne = new WatchOne();
watchOne.setZk(watchOne.startZK());
if (watchOne.getZk().exists(PATH, false) == null) {
watchOne.createZNode(PATH, "BBB");
System.out.println("**********************>: " + watchOne.getZNode(PATH));
Thread.sleep(Long.MAX_VALUE);
} else {
System.out.println("i have znode");
}
}
}
多次(命名服务)
package com.wjj;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class WatchMore {
private static final String CONNECTSTRING = "192.168.190.131:2181";
private static final String PATH = "/testMore";
private static final int SESSION_TIMEOUT = 50 * 1000;
private ZooKeeper zk = null;
private String lastValue = "";
public ZooKeeper startZK() throws IOException {
return new ZooKeeper(CONNECTSTRING, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
public void stopZK() throws InterruptedException {
if (zk != null) {
zk.close();
}
}
public void createZNode(String path, String nodeValue) throws KeeperException, InterruptedException {
zk.create(path, nodeValue.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
public String getZNode(String path) throws KeeperException, InterruptedException {
byte[] byteArray = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
triggerValue(path);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}, new Stat());
return new String(byteArray);
}
public boolean triggerValue(String path) throws KeeperException, InterruptedException {
byte[] byteArray = zk.getData(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
triggerValue(path);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}, new Stat());
String newValue = new String(byteArray);
if (lastValue.equals(newValue)) {
System.out.println("there is no change~~~~~~~~");
return false;
} else {
System.out.println("lastValue: " + lastValue + "\t" + "newValue: " + newValue);
this.lastValue = newValue;
return true;
}
}
public ZooKeeper getZk() {
return zk;
}
public void setZk(ZooKeeper zk) {
this.zk = zk;
}
public String getLastValue() {
return lastValue;
}
public void setLastValue(String lastValue) {
this.lastValue = lastValue;
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
WatchMore watch = new WatchMore();
watch.setZk(watch.startZK());
if (watch.getZk().exists(PATH, false) == null) {
String initValue = "0000";
watch.setLastValue(initValue);
watch.createZNode(PATH, initValue);
System.out.println("**********************>: " + watch.getZNode(PATH));
Thread.sleep(Long.MAX_VALUE);
} else {
System.out.println("i have znode");
}
}
}
ZooKeeper集群
zookeeper 集群
伪分布式单机配置
说明
服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
这个配置项的书写格式比较特殊,规则如下:server.N=YYY:A:B 其中, N表示服务器编号, YYY表示服务器的IP地址, A为LF通信端口,表示该服务器与集群中的leader交换的信息的端口。 B为选举端口,表示选举新leader时服务器间相互通信的端口(当leader挂掉时,其余服务器会相互通信,选择出新的leader)
一般来说,集群中每个服务器的A端口都是一样,每个服务器的B端口也是一样。 下面是一个集群的例子: server.0=233.34.9.144:2008:6008 server.1=233.34.9.145:2008:6008 server.2=233.34.9.146:2008:6008 server.3=233.34.9.147:2008:6008
但是当所采用的为伪集群时,IP地址都一样,只能是A端口和B端口不一样。 下面是一个伪集群的例子: server.0=127.0.0.1:2008:6008 server.1=127.0.0.1:2007:6007 server.2=127.0.0.1:2006:6006 server.3=127.0.0.1:2005:6005
配制步骤
1、zookeeper-3.4.9.tar.gz解压后拷贝到/myzookeeper目录下并重新名为zk01,再复制zk01形成zk02、zk03,共计3份
2、zookeeper-3.4.9.tar.gz解压后拷贝到/myzookeeper目录下并重新名为zk01,再复制zk01形成zk02、zk03,共计3份
mydata
mylog
3、分别进入zk01-zk03各自的conf文件夹
新建zoo.cfg
4、编辑zoo.cfg
在zoo.cfg中最后加入:
server.1=127.0.0.1:2991:3991 server.2=127.0.0.1:2992:3992 server.3=127.0.0.1:2993:3993
设置自己的数据和log路径
dataDir=/myzookeeper/zk01/mydata
dataLogDir=/myzookeeper/zk01/mylog
修改各自的clientPort
在最后面添加server的列表
5、各自mydata下面创建myid的文件,在里面写入server的数字
6、分别启动三个服务器
7、zkCli连接server,带参数指定-server
2191/2192/2193任意用客户端链接一台,会发现只需要有一个改变了,整个集群的内容自动一致性同步。
|