相关命令
create [-s] [-e] [-c] [-t ttl] path [data] [acl]
ls [-s] [-w] [-R] path
delete [-v version] path
实现思路一
- 在指定目录下,创建有序且临时节点,客户端根据自身创建的节点是否是 “序号最小”的节点进行判断
- 如果是 “序号最小” 的节点,则认为当前客户端 拥有 LOCK
- 如果非 “序号最小” 的节点,则认为当前客户端 无 LOCK
- 业务代码执行完毕后,释放LOCK操作,即 delete 该节点
- 其他机器都监听前一名节点,监听到 (EventType.NodeDeleted) 事件后,查询新的节点列表,判断当前实例是否属于第一节点
思路一考虑点
- 节点1释放锁,当 watcher 通知到其他客户端失败,其他实例会一直认为节点1还拿着锁
- 当实例3创建临时节点后,不打算争锁了,需要 delete -path ,如果delete失败,这个节点会一直留着等到该实例重启
实现思路二
- 在指定目录下,创建有序且设置TTL的节点,客户端根据自身创建的节点是否是 “序号最小”的节点进行判断
- 如果是 “序号最小” 的节点,则认为当前客户端 拥有 LOCK
- 如果非 “序号最小” 的节点,则认为当前客户端 无 LOCK
- 业务代码执行完毕后,释放LOCK操作,即 delete 该节点
- 定时查询,节点列表,判断当前实例是否属于第一节点
思路二考虑点
- 争锁场景,客户端数量不会特别多,锁到的线程不会在轮询查;仅锁不到的线程会轮询查,如果再搭配一个“尝试获取锁时间”概念,在大部分场景可以接受了。
- 需要对 zoo.cfg 开启配置 “extendedTypesEnabled=true” 以开启 TTL 功能
代码示例实现
public class ZookeeperLockTest {
private static ZooKeeper zooKeeper;
static {
try {
zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, new ConnectWatcher(), new ZKClientConfig());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
long maxWaitTime = 4000;
long maxLockedTime = 30000;
String key = "/zk_temp";
execute(key, maxWaitTime, maxLockedTime, new ZookeeperLockCallback() {
@Override
public void doWithLock(boolean locked) throws InterruptedException {
if (!locked) {
System.out.println("等了" + maxWaitTime + "还是没拿到锁");
return;
}
System.out.println("锁住了,开始处理业务");
Thread.sleep(10000);
}
});
}
private static void execute(String key, long maxWaitTime, long maxLockedTime, ZookeeperLockCallback callback) {
if (maxWaitTime < 0 || maxLockedTime < 0 || maxWaitTime > zooKeeper.getSessionTimeout()) {
throw new IllegalArgumentException("参数格式不正确");
}
String currentPath = null;
long maxLockedTimeMillis = System.currentTimeMillis() + maxLockedTime;
try {
currentPath = zooKeeper.create(key + "/", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL,
null, maxLockedTime);
List<String> children = zooKeeper.getChildren(key, false);
String firstChildren = key + "/" + children.stream().sorted().findFirst().orElse(null);
long maxWaitTimeMillis = System.currentTimeMillis() + maxWaitTime;
while (!Objects.equals(currentPath, firstChildren)) {
if (maxWaitTimeMillis > System.currentTimeMillis()) {
Thread.sleep(100);
} else {
break;
}
children = zooKeeper.getChildren(key, false);
firstChildren = key + "/" + children.stream().sorted().findFirst().orElse(null);
}
callback.doWithLock(Objects.equals(currentPath, firstChildren));
} catch (Exception e) {
e.printStackTrace();
} finally {
if (maxLockedTimeMillis > System.currentTimeMillis() && currentPath != null) {
try {
zooKeeper.delete(currentPath, 0);
System.out.println("业务退出,释放锁");
} catch (Exception e) {
System.out.println("释放锁异常" + e.getMessage());
}
} else {
System.out.println("锁早被自动释放了");
}
}
}
}
@FunctionalInterface
public interface ZookeeperLockCallback {
void doWithLock(boolean locked) throws InterruptedException;
}
|