Zookeeper:基于观察模式设计的分布式服务管理框架
·负责存储和管理关心的数据,然后接收观察者的注册
·简单来说,Zookeepoer = 文件系统 + 通知机制
·clientPort = 2181;客户端的连接端口
Zookeeper特点
·Zookeeper:一个领导者(Leader),多个跟随者(follower)组成的集群
·集群中只要有半数以上存活,Zookeeper集群就能正常服务
·全局一致性:每个Server保存一个相同的数据副本,Clinet无论连接哪个Server,数据都是一致的
·更新请求顺序进行,来自同一个Client的更新请求按其发送顺序依次执行
·数据更新原子性,一次数据更新要么成功,要么失败
·实时性,在一定时间范围内,Client能读到最新数据
Zookeeper数据结构
·类似于Unix文件系统,整体可以看做是一棵树,每个节点称作为ZNode,每个Znode默认存储1MB的数据,每个ZNode都可以通过路径唯一标识
Zookeeper应用场景
·统一命名服务:在分布式环境下,经常需要对应用/服务进行统一命名,便于识别--->nginx更好
·统一配置管理:分布式环境下,配置文件同步非常常见;可将配置文件写入Zookeeper的一个ZNode上,每个客户端监听这个ZNode,一旦改变,zookeeper将通知每个客户端服务器
·统一集群管理:分布式环境中,实时掌握每个节点的状态是必要的。
·服务器动态上下线:nginx更高
·软负载均衡:zookeeper记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户请求-->nginx硬负载均衡
Zookeeper安装部署
·将压缩包解压指定的路径
·修改zookeeper文件夹名为zookeeper-版本号
·添加环境变量-->随地可以使用zookeeper的脚本
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export PATH=$PATH:$ZOOKEEPER_HOME/bin
·/zookeeper-3.5.7/conf下的zoo_sample.cfg为样配置文件,修改名字为zoo.zfg
·修改配置文件中的dataDir地址为 /zookeeper-3.5.7/zkData
·启动zookeeper
bin/zkServer.sh start
·查看进程是否启动
jps -->QuorumPeerMain
·查看状态
bin/zkServer.sh status
·启动客户端
bin/zkCli.sh
·停止zookeeper
bin/zkServer.sh stop
zookeeper分布式部署
·各个服务器都安装好zookeeper
·配置环境变量
·修改zookeeper/cong/zoo.zfg的dataDir地址为 /zookeeper-3.5.7/zkData
·在zkData下创建一个myid的文件,里面添加server对应编号2,分发到其他服务器上,修改各个服务器编号
·配置zoo.cfg
·添加配置
#######################cluster##########################
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
参数解读
server.A=B:C:D
A是一个数字,同zkData下myid内一样,代表几号服务器
B代表这个服务器的地址
C是这个服务器和集群中Leader服务器交换信息的端口
D表示万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,用来执行选举制服务器互相通信的端口
Zookeeper启停脚本
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input"
exit
fi
case $1 in
"start")
for i in hadoop102 hadoop103 hadoop104
do
echo "==================$i=================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh start
done
for i in hadoop102 hadoop103 hadoop104
do
echo "==================$i=================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh status
done
;;
"stop")
for i in hadoop102 hadoop103 hadoop104
do
echo "==================$i=================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh stop
done
;;
"status")
for i in hadoop102 hadoop103 hadoop104
do
echo "==================$i=================="
ssh $i /opt/module/zookeeper-3.5.7/bin/zkServer.sh status
done
;;
*)
echo "Args Error"
;;
esac
客户端命令
·增
·create path
·create path "" //带值
·create -e path 临时节点:随着创建节点的存在而存在
·create -s path 带序号节点:带有创建的顺序号
·删
·delect path 不能删除非空的
·delectall path 递归删除
·改
·set path "" 修改节点的值
·查
·ls path 查看子节点
·get path 查看节点的值
·stat path 打印节点的结构体信息
·ls -s path 查看子节点并打印当前节点的结构信息
·get -s path 查看节点的值,并打印当前节点的结构信息
·监听
·ls -w 打印子节点,并监听子节点变化(增加或者删除都会监听到)--只能做单次监听
·get -w 打印节点的值,并监听节点值变化
Zookeeper内部原理
·节点类型
·持久:客户端和服务器端断开连接后,创建的节点不删除
·持久化目录节点
·持久化顺序编号目录节点
·临时:客户端和服务器端断开连接后,创建的节点自动删除
·临时目录节点
·临时顺序编号目录节点
·stat结构体
·cZxid:节点创建时间16进制时间戳
·ctime:节点创建时间
·mZxid:节点改变时间16进制时间戳
·mtime:节点创建时间
·pZxid:最后更新的子节点时间戳
·cversion:节点点修改次数
·dataversion:数据变化次数
·aclversion:访问控制列表的次数
·ephemeralOwner:看是否是临时节点,临时节点的话不为0
·dataLength:数据的长度
·numChildren:子节点数量
·监听器原理
·有一个main()线程
·在main线程中创建Zookeeper客户端,这时会创建两个线程,一个负责网络连接通信(connet)专门连接服务端,跟服务端通信,发送监听什么;一个负责监听(listener),负责接收监听的结果
·通过connect线程将注册的监听事件发送给Zookeeper服务端
·在Zookeeper服务端的注册监听器列表中将注册的监听事件添加到列表中
·Zookeeper服务端监听到有数据或路径的变化,就会将这个消息发送给listener线程
·listener线程内部调用了process()方法<此方法由服务端唤醒>
·选举机制
·半数机制
zookeeper集群中半数机器存活,集群才可用
·自私原则
首先把票投给自己
·墙头草原则
如果有比自己id大的,将票投给id大的
·投票机制
·如果是同时启动的zookeeper,id小的会将票投给id大的
·如果是逐个启动,不满足半数,集群无法工作,投票结果不作数;当到达半数时,id小的会将票投给id大的,id大的作为leader,集群可以运行,后面id更大的则直接作为Follower
·写数据流程
1、client向zookeeper的server1上发送请求
2、如果server1不是Leader,那么Server1会把接受的请求进一步转发给Leader,每个Zookeeper集群只有一个Leader。Leader会将请求广播给每个Server,各个Server会将请求加入代写队列,并向Leader发送成功信息
3、当Leader收到半数Server的成功信息,Lerader会向各个Server发送提交信息,各个Server再落实队列中的请求
4、server1会进一步通知client执行成功,这时认为整个请求成功
API应用
·导入依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
·log4j
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
·API
/**
* 1、创建对象
* 2、操作
* 3、关闭客户端对象
*/
public class ZK_Client {
private ZooKeeper zk;
@Before
public void zk_init() throws IOException, InterruptedException {
String connectString ="hadoop101:2181,hadoop102:2181,hadoop103:2181";
zk = new ZooKeeper(connectString, 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
@After
public void zk_close() throws InterruptedException {
zk.close();
}
// ls /xcu
@Test
public void ls() throws KeeperException, InterruptedException {
List<String> children = zk.getChildren("/xcu/408", false);
System.out.println(children);
}
//Watcher 如何做多次监听,递归
@Test
public void lsAndWatch(String path) throws KeeperException, InterruptedException {
List<String> children = zk.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
}
});
System.out.println(children);
//线程休眠
Thread.sleep(Long.MAX_VALUE);
}
//创建子节点
@Test
public void createNode() throws KeeperException, InterruptedException {
//第二个参数为“节点的值”;ACL权限控制列表;PERSISTENT永久的,EPHEMERAL临时的
// zk.create("/xcu/408/1", "hyj".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.create("/xcu/408/2", "lhy".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
Thread.sleep(Long.MAX_VALUE);
}
//获取节点的值,不监听
@Test
public void get() throws KeeperException, InterruptedException {
// byte[] data = zk.getData("/xcu", false, null);
// System.out.println(new String(data));
//判断节点是否存在
Stat stat = zk.exists("/xcu", false);
byte[] data = zk.getData("/xcu", false, stat);
System.out.println(new String(data));
}
//获取节点的值,单次监听,
@Test
public void getAndWatch() throws KeeperException, InterruptedException {
//判断节点是否存在
Stat stat = zk.exists("/xcu", false);
byte[] data = zk.getData("/xcu", new Watcher() {
@Override
public void process(WatchedEvent event) {
}
}, stat);
System.out.println(new String(data));
Thread.sleep(Long.MAX_VALUE);
}
//设置节点的值
@Test
public void set() throws KeeperException, InterruptedException {
//判断节点是否存在
Stat stat = zk.exists("/xcu/408", false);
if (stat == null) {
System.out.println("节点不存在");
return;
}
zk.setData("/xcu/408","BigData".getBytes(),-1);
}
//删除节点的值
@Test
public void delete() throws KeeperException, InterruptedException {
//判断节点是否存在
Stat stat = zk.exists("/xcu/408", false);
if (stat == null) {
System.out.println("节点不存在");
return;
}
zk.delete("/xcu/408/1",-1);
System.out.println("删除成功");
}
//递归删除节点的值
@Test
public void deleteAll(String path,ZooKeeper zk) throws KeeperException, InterruptedException {
//判断节点是否存在
Stat stat = zk.exists(path, false);
if (stat == null) {
System.out.println("节点不存在");
return;
}
//获取当前传入节点下的所有子节点
List<String> children = zk.getChildren(path, false);
if(children.isEmpty()){
//说明传入的节点没有子节点,可以直接删除
zk.delete(path,stat.getVersion());
}else{
//如果传入的节点有子节点,循环所有子节点
for (String child : children) {
//删除子节点,但是不知道子节点下面有没有子节点,所以递归调用
deleteAll(path + "/",zk);
}
zk.delete(path,stat.getVersion());
}
System.out.println("删除成功");
}
//获取子节点的列表,并循环监听
public void LsAndWatch(String path) throws KeeperException, InterruptedException {
List<String> children = zk.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
try {
lsAndWatch(path);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
System.out.println(children);
}
}
|