Zookeeper安装
http://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz
Zookeeper配置文件
tickTime=2000
dataDir=/tmp/zookeeper
dataLogDir=/tmp/zookeeperLog
admin.serverPort=8082
clientPort=2181
Zookeeper作用
? 配置管理
? 分布式锁
? 集群管理
Zookeeper数据模型
persistent持久化节点
ephemeral临时节点
persistent_sequential持久化顺序节点
ephemeral_sequential临时顺序节点
Zookeeper服务端命令
./zkServer.sh start
./zkServer.sh status
./zkServer.sh stop
./zkServer.sh restart
Zookeeper客户端命令
./zkCli.sh -server localhost:2181
quit
ls path
create path data
delete path
rmr path
ls -s path
get path
set path data
Zookeeper Java操作
创建连接
导入curator-framework、curator-recipes、slf4j-api、slf4j-log4j12
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
baseSleepTimeMs,
maxRetries
);
CuratorFramework client = CuratorFrameworkFactory.newClient(
"ip:port",
sessionTimeoutMs,
connectionTimeoutMs,
retryPolicy
);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
baseSleepTimeMs,
maxRetries
);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectionString("ip:port")
.sessionTimeoutMs(num)
.connectionTimeoutMs(num)
.retryPolicy(retryPolicy)
.build();
client.start();
添加节点
client
.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/app", data[]);
查询节点
String data = client.getData().forPath("/app");
List<String> datas = client.getChildren().forPath("/app");
Stat status = new Stat();
String data = client.getData().storingStatIn(status).forPath("/app");
修改节点
client.setData().forPath("/app", newData[]);
int version = status.getVersion();
client.setData().withVersion(version).forPath("/app", newDate[][]);
删除节点
client.delete().forPath("/app");
client.delete().deleteingChildrenIfNeeded().forPath("/app");
client.delete().guaranteed().forPath("/app");
client.delete().inBackground(new BackgroundCallback() {
public void processResutl(CuratorFramework client, CuratorEvent event) throws Exception {}
}).forPath("/app");
Wctch事件监听
监听节点
NodeCache nodeCache = new NodeCache(client, "/app");
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged throws Exception() {
byte[] data = nodeCache.getCurrentData().getData();
}
});
nodeCache.start(true);
监听子节点
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/app", true);
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {}
});
pathChildrenCache.start();
监听节点及其子节点
TreeCache treeCache = new TreeCache(client, "/app");
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {}
});
treeCache.start();
分布式锁
客户端获取锁,创建节点,使用完锁,删除节点
创建临时节点,防止客户端宕机未能删除节点
创建顺序节点,使最小的节点获取锁,其余节点监听比其小一个节点的删除事件
InterProcessMutex lock = new InterProcessMutex(client, "/lock");
lock.acquire(3, TimeUnit.SECONDS);
lock.release();
集群搭建
在zookeeper中的data目录下创建一个myid文件,内容是服务器的ID
在zookeeper中的zoo.cfg配置集群服务器的IP和Port,server.id=ip:port
集群角色
Leader
? 处理事务请求
? 集群内部各服务器的调度者
Follower
? 处理非事务请求,转发事务请求给leader
? 参与leader选举投票
Observer
? 处理客户端非事务请求,转发事务请求给leader
|