简介
? ? ? ? ?Zookeeper是一个Apache开源的分布式的应用,为系统架构提供协调服务。从设计模式角度来审视:该组件是一个基于观察者模式设计的框架,负责存储和管理数据,接受观察者的注册,一旦数据的状态发生变化,Zookeeper就将负责通知已经在Zookeeper上注册的观察者做出相应的反应,从而实现集群中类似Master/Slave管理模式。ZooKeeper的目标就是封装好复杂易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。
技术架构:
Spring Boot 2.6.3 、Zookeeper3.4.6 、?JDK1.8
导入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
application.yml配置
zoo:
keeper:
#开启标志
enabled: true
#服务器地址
server: 192.168.116.100:2181
#命名空间,被称为ZNode
namespace: lx
#权限控制,加密
digest: smile:111111
#会话超时时间
sessionTimeoutMs: 3000
#连接超时时间
connectionTimeoutMs: 60000
#最大重试次数
maxRetries: 10
#初始休眠时间
baseSleepTimeMs: 1000
Zookeeper配置
package com.example.zk.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author lanx
* @date 2022/3/6
*/
@Data
@Component
@ConfigurationProperties(prefix = "zoo.keeper")
public class ZookeeperProperties {
private String enabled;
private String server;
private String namespace;
private String digest;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int maxRetries;
private int baseSleepTimeMs;
}
Zookeeper?初始化
package com.example.zk.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* 配置
* @author lanx
* @date 2022/3/6
*/
@Slf4j
@Configuration
public class ZookeeperConfig {
@Autowired
private ZookeeperProperties zookeeperProperties ;
private static CuratorFramework client = null ;
/**
* 初始化
*/
@PostConstruct
public void init (){
//重试策略,初试时间1秒,重试10次
RetryPolicy policy = new ExponentialBackoffRetry(
zookeeperProperties.getBaseSleepTimeMs(),
zookeeperProperties.getMaxRetries());
//通过工厂创建Curator
client = CuratorFrameworkFactory.builder()
.connectString(zookeeperProperties.getServer())
.authorization("digest",zookeeperProperties.getDigest().getBytes())
.connectionTimeoutMs(zookeeperProperties.getConnectionTimeoutMs())
.sessionTimeoutMs(zookeeperProperties.getSessionTimeoutMs())
.retryPolicy(policy).build();
//开启连接
client.start();
log.info("zookeeper 初始化完成...");
}
public static CuratorFramework getClient (){
return client ;
}
public static void closeClient (){
if (client != null){
client.close();
}
}
}
Zookeeper节点数据监听
package com.example.zk.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 节点数据监听
* @author lanx
* @date 2022/3/6
*/
@Component
@Slf4j
public class ZookListening implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
/**
* 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
*/
ExecutorService pool = Executors.newFixedThreadPool(2);
/**
* 监听数据节点的变化情况
*/
final NodeCache nodeCache = new NodeCache(ZookeeperConfig.getClient(), "/lanxi", false);
nodeCache.start(true);
nodeCache.getListenable().addListener(
/**
* 此方法只监听创建节点和更新节点,在删除节点是不触发此操作
*/
new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
log.info("Node data is changed, new data: {}" ,
new String(nodeCache.getCurrentData().getData()));
}
},
pool
);
/**
* 监听子节点的变化情况
*
* 建立一个PathChildrenCache的缓存,第三个参数为是否接收节点数据内容,如果为 false 则不接受
*/
final PathChildrenCache childrenCache = new PathChildrenCache(ZookeeperConfig.getClient(), "/llx", true);
//在初始时就进行缓存监听
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(
new PathChildrenCacheListener() {
/**
* 监听节点的变更。 新建、修改、删除
* @param client
* @param event
* @throws Exception
*/
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED: " + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED: " + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED: " + event.getData().getPath());
break;
default:
break;
}
}
},
pool
);
}
}
curator框架,使用 DistributedAtomicInteger 作为分布式计数器
//curator框架,使用 DistributedAtomicInteger 作为分布式计数器
DistributedAtomicInteger atomicInteger =
new DistributedAtomicInteger(ZookeeperConfig.getClient(), "/llx", new RetryNTimes(3, 1000));
//复位
atomicInteger.forceSet(0);
AtomicValue<Integer> value = atomicInteger.add(1);
log.info("Boolean:{}", value.succeeded());
log.info("最新值:{}", value.postValue());
log.info("原始值:{}", value.preValue());
curator 框架,使用 InterProcessMutex 分布式锁
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//分布式锁
final InterProcessMutex lock = new InterProcessMutex(ZookeeperConfig.getClient(), "/llx");
try {
//获取锁
lock.acquire();
log.info("线程:{},执行业务逻辑", Thread.currentThread().getName());
Thread.sleep(1000*10);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//释放锁
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}
Zookeeper操作Service类
package com.example.zk.service;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import java.util.List;
public interface ZookeeperService {
/**
* 判断节点是否存在
*/
boolean isExistNode (final String path) ;
/**
* 创建节点
*/
void createNode (CreateMode mode, String path ) ;
/**
* 设置节点数据
*/
void setNodeData (String path, String nodeData) ;
/**
* 创建节点
*/
void createNodeAndData (CreateMode mode, String path , String nodeData) ;
/**
* 获取节点数据
*/
String getNodeData (String path) ;
/**
* 获取节点下数据
*/
List<String> getNodeChild (String path) ;
/**
* 是否递归删除节点
*/
void deleteNode (String path,Boolean recursive) ;
/**
* 获取读写锁
*/
InterProcessReadWriteLock getReadWriteLock (String path) ;
}
package com.example.zk.service.impl;
import com.example.zk.config.ZookeeperConfig;
import com.example.zk.service.ZookeeperService;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
public class ZookeeperServiceImpl implements ZookeeperService {
@Override
public boolean isExistNode(String path) {
CuratorFramework client = ZookeeperConfig.getClient();
client.sync() ;
try {
Stat stat = client.checkExists().forPath(path);
return client.checkExists().forPath(path) != null;
} catch (Exception e) {
log.error("isExistNode error...", e);
e.printStackTrace();
}
return false;
}
@Override
public void createNode(CreateMode mode, String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 递归创建所需父节点
client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
log.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public void setNodeData(String path, String nodeData) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 设置节点数据
client.setData().forPath(path, nodeData.getBytes("UTF-8"));
} catch (Exception e) {
log.error("setNodeData error...", e);
e.printStackTrace();
}
}
@Override
public void createNodeAndData(CreateMode mode, String path, String nodeData) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 创建节点,关联数据
client.create().creatingParentsIfNeeded().withMode(mode)
.forPath(path,nodeData.getBytes("UTF-8"));
} catch (Exception e) {
log.error("createNode error...", e);
e.printStackTrace();
}
}
@Override
public String getNodeData(String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
// 数据读取和转换
byte[] dataByte = client.getData().forPath(path) ;
String data = new String(dataByte,"UTF-8") ;
if (!StringUtils.isEmpty(data)){
return data ;
}
}catch (Exception e) {
log.error("getNodeData error...", e);
e.printStackTrace();
}
return null;
}
@Override
public List<String> getNodeChild(String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
List<String> nodeChildDataList = new ArrayList<>();
try {
// 节点下数据集
nodeChildDataList = client.getChildren().forPath(path);
} catch (Exception e) {
log.error("getNodeChild error...", e);
e.printStackTrace();
}
return nodeChildDataList;
}
@Override
public void deleteNode(String path, Boolean recursive) {
CuratorFramework client = ZookeeperConfig.getClient() ;
try {
if(recursive) {
// 递归删除节点
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
// 删除单个节点
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
log.error("deleteNode error...", e);
e.printStackTrace();
}
}
@Override
public InterProcessReadWriteLock getReadWriteLock(String path) {
CuratorFramework client = ZookeeperConfig.getClient() ;
// 写锁互斥、读写互斥
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, path);
return readWriteLock ;
}
}
Zookeeper操作Controller类
package com.example.zk.web;
import com.example.zk.service.ZookeeperService;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import static org.apache.zookeeper.CreateMode.PERSISTENT;
/**
* Zookeeper操作
* @author lanx
* @date 2022/3/6
*/
@RestController
public class ZookeeperApi {
@Autowired
private ZookeeperService zookeeperService ;
/**
* 获取节点下数据
* @param path
* @return
*/
@GetMapping("/getNodeData")
public String getNodeData (String path) {
return zookeeperService.getNodeData(path) ;
}
/**
* 判断节点是否存在
* @param path
* @return
*/
@GetMapping("/isExistNode")
public boolean isExistNode (final String path){
return zookeeperService.isExistNode(path) ;
}
/**
* 创建节点
* @param path
* @return
*/
@GetMapping("/createNode")
public String createNode (String path ){
zookeeperService.createNode(CreateMode.PERSISTENT,path) ;
return "success" ;
}
/**
* 节点设置数据
* @param path
* @param nodeData
* @return
*/
@GetMapping("/setNodeData")
public String setNodeData (String path, String nodeData) {
zookeeperService.setNodeData(path,nodeData) ;
return "success" ;
}
/**
* 创建节点并保存数据
* @param path
* @param nodeData
* @return
*/
@GetMapping("/createNodeAndData")
public String createNodeAndData ( String path , String nodeData){
zookeeperService.createNodeAndData(PERSISTENT,path,nodeData) ;
return "success" ;
}
/**
*获取节点下数据
* @param path
* @return
*/
@GetMapping("/getNodeChild")
public List<String> getNodeChild (String path) {
return zookeeperService.getNodeChild(path) ;
}
/**
* 是否递归删除节点
* @param path
* @param recursive
* @return
*/
@GetMapping("/deleteNode")
public String deleteNode (String path,Boolean recursive) {
zookeeperService.deleteNode(path,recursive) ;
return "success" ;
}
}
接口测试
http://127.0.0.1:8088/createNode?path=/llx/222
|