利用zk实现分布式锁的原理:

curator框架对分布式锁的实现:
//获取zookpeer连接
CuratorFramework curatorFramework = getCuratorFramework();
//创建分布式锁
InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/lock");
//加锁
interProcessMutex.acquire();
//释放锁
interProcessMutex.release();
首先进入acquire()方法
public void acquire() throws Exception {
//internalLock方法是主要的加锁实现
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
继续进入internalLock方法,该方法主要是实现锁的可重入操作,对获取到锁的线程存入map中,保证可重入。(总结:curator会构造一个map,每次加锁成功后,把锁存入map,key是当前加锁线程,value是加锁的信息,每次获取锁的时候,首先尝试是否能在map中拿到,如果拿到了,直接对锁的计数器加一。进行获取锁的操作在attemptLock方法,见下一个方法详解)?
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//利用当前线程作为key 去map中查是否已经存在锁,为了锁的可重入
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
//如果不是空 直接对锁的计数器 加一
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
} else {
//如果加锁的map里没查到,就进行获取锁的操作(见下面详解)
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
//获取锁成功后,用当前线程作为key,锁的节点作为value,存map里
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
//----------------
//存放锁的map
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
//锁的信息结构
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount;
private LockData(Thread owningThread, String lockPath) {
this.lockCount = new AtomicInteger(1);
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
接上一步,来看一下internals.attemptLock方法。(总结:该方法主要是利用死循环来尝试获取锁,创建一个容器节点作为加锁节点的根路径,并且会创建一个临时顺序节点,使用临时节点好处是所有加锁节点都被删除后,该容器节点也会随之消失,节省不必要的内存浪费。创建好临时顺序节点后,真正的加锁逻辑在createsTheLock方法中,见下一个方法详解)
/**
* 加锁方法
*/
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = unit != null ? unit.toMillis(time) : null;
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
//在一个while死循环里,阻塞式的获取锁,直到获取成功为止
while(!isDone) {
isDone = true;
try {
//判断父节点是否存在,如果不存在就创建一个容器节点,并在该节点下创建临时顺序节点
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
//真正的加锁方法(加下面详解)
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
接着终于到了真正加锁逻辑的internalLockLoop方法。(总结:拿到父容器节点下面的所有临时顺序节点,进行一个排序,看看当前我们创建的这个节点是不是所有临时顺序节点中最小的那一个。如果是就代表获取锁成功,如果不是最小的,代表获取锁失败,构造一个监听器用于监听当前节点的前一个节点,并利用synchronized+wait的方式,让当前线程处于阻塞状态。)
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
//拿到所有子节点并进行排序
List<String> children = this.getSortedChildren();
//利用结对路径截取出刚刚创建的子节点的名称
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
//判断当前的节点在所有的子节点中是不是最小的,如果是代表获取锁成功;否则获取锁失败
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
//如果true,则代表加锁成功
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {//如果false,代表加锁失败
//拼接被监听节点的上一个节点的全路径,因为监听一个节点需要该节点全路径
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
//利用synchronized+wait实现线程等待
synchronized(this) {
try {
//对当前节点添加一个监听器,监听前一个节点的状态,如果发生变化,就利用notifyAll,释放所有线程等待
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if (millisToWait == null) {
//没设置超时时间,直接等待
this.wait();
} else {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
//设置了超时时间,超时等待
this.wait(millisToWait);
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
以上就是curator框架操作zookeeper实现分布式锁的具体源码。
场景:
1、如果某个节点监听了他前一个节点,但是他前一个节点由于session断连,被zookpeer删除了,此时会出发当前节点的监听,当前节点又会走上面讲的加锁逻辑判断,发现自己不是最小的节点,又会加锁失败,并监听比当前节点小的上一个节点,就会把由于session断连的那个节点剔除,不会影响锁的正常获取。
2、由于curator框架在操作zookpeer时,发送命令和处理监听响应是两个线程,所以可能出现当前节点创建的命令已经发出,并在zookpeer服务器上成功创建该节点,但响应线程由于网络问题,没有收到,由于curator的重试机制,就会再次发起创建命令,就会重复创建加锁节点。但curator会利用protectioned模式来解决这一问题,就是在所创建节点的名字前面加上UUID作为前缀,重试操作时,会先判断有没有UUID为前缀的节点,有了就不会重复创建。
其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。
|