IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> zookeeper实现分布式锁源码解析 -> 正文阅读

[大数据]zookeeper实现分布式锁源码解析

利用zk实现分布式锁的原理:

curator框架对分布式锁的实现:

        //获取zookpeer连接
        CuratorFramework curatorFramework = getCuratorFramework();

        //创建分布式锁
        InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, "/lock");
        //加锁
        interProcessMutex.acquire();
        //释放锁
        interProcessMutex.release();

首先进入acquire()方法

public void acquire() throws Exception {
        //internalLock方法是主要的加锁实现
        if (!this.internalLock(-1L, (TimeUnit)null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
        }
    }

继续进入internalLock方法,该方法主要是实现锁的可重入操作,对获取到锁的线程存入map中,保证可重入。(总结:curator会构造一个map,每次加锁成功后,把锁存入map,key是当前加锁线程,value是加锁的信息,每次获取锁的时候,首先尝试是否能在map中拿到,如果拿到了,直接对锁的计数器加一。进行获取锁的操作在attemptLock方法,见下一个方法详解)?

private boolean internalLock(long time, TimeUnit unit) throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //利用当前线程作为key 去map中查是否已经存在锁,为了锁的可重入
        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
        //如果不是空  直接对锁的计数器 加一
        if (lockData != null) {
            lockData.lockCount.incrementAndGet();
            return true;
        } else {
            //如果加锁的map里没查到,就进行获取锁的操作(见下面详解)
            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
            if (lockPath != null) {
                //获取锁成功后,用当前线程作为key,锁的节点作为value,存map里
                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
                this.threadData.put(currentThread, newLockData);
                return true;
            } else {
                return false;
            }
        }
    }

//----------------
//存放锁的map
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData;
//锁的信息结构
private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount;

        private LockData(Thread owningThread, String lockPath) {
            this.lockCount = new AtomicInteger(1);
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

接上一步,来看一下internals.attemptLock方法。(总结:该方法主要是利用死循环来尝试获取锁,创建一个容器节点作为加锁节点的根路径,并且会创建一个临时顺序节点,使用临时节点好处是所有加锁节点都被删除后,该容器节点也会随之消失,节省不必要的内存浪费。创建好临时顺序节点后,真正的加锁逻辑在createsTheLock方法中,见下一个方法详解)

/**
 * 加锁方法
 */
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        long startMillis = System.currentTimeMillis();
        Long millisToWait = unit != null ? unit.toMillis(time) : null;
        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
        //在一个while死循环里,阻塞式的获取锁,直到获取成功为止
        while(!isDone) {
            isDone = true;

            try {
                //判断父节点是否存在,如果不存在就创建一个容器节点,并在该节点下创建临时顺序节点
                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
                //真正的加锁方法(加下面详解)
                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (NoNodeException var14) {
                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    throw var14;
                }

                isDone = false;
            }
        }

        return hasTheLock ? ourPath : null;
    }

接着终于到了真正加锁逻辑的internalLockLoop方法。(总结:拿到父容器节点下面的所有临时顺序节点,进行一个排序,看看当前我们创建的这个节点是不是所有临时顺序节点中最小的那一个。如果是就代表获取锁成功,如果不是最小的,代表获取锁失败,构造一个监听器用于监听当前节点的前一个节点,并利用synchronized+wait的方式,让当前线程处于阻塞状态。)

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;

        try {
            if (this.revocable.get() != null) {
                ((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
            }

            while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
                //拿到所有子节点并进行排序
                List<String> children = this.getSortedChildren();
                //利用结对路径截取出刚刚创建的子节点的名称
                String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
                //判断当前的节点在所有的子节点中是不是最小的,如果是代表获取锁成功;否则获取锁失败
                PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
                //如果true,则代表加锁成功
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {//如果false,代表加锁失败
                    //拼接被监听节点的上一个节点的全路径,因为监听一个节点需要该节点全路径
                    String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
                    //利用synchronized+wait实现线程等待
                    synchronized(this) {
                        try {
                            //对当前节点添加一个监听器,监听前一个节点的状态,如果发生变化,就利用notifyAll,释放所有线程等待
                            ((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
                            if (millisToWait == null) {
                                //没设置超时时间,直接等待
                                this.wait();
                            } else {
                                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                if (millisToWait > 0L) {
                                    //设置了超时时间,超时等待
                                    this.wait(millisToWait);
                                } else {
                                    doDelete = true;
                                    break;
                                }
                            }
                        } catch (NoNodeException var19) {
                        }
                    }
                }
            }
        } catch (Exception var21) {
            ThreadUtils.checkInterrupted(var21);
            doDelete = true;
            throw var21;
        } finally {
            if (doDelete) {
                this.deleteOurPath(ourPath);
            }

        }

        return haveTheLock;
    }

以上就是curator框架操作zookeeper实现分布式锁的具体源码。

场景:

1、如果某个节点监听了他前一个节点,但是他前一个节点由于session断连,被zookpeer删除了,此时会出发当前节点的监听,当前节点又会走上面讲的加锁逻辑判断,发现自己不是最小的节点,又会加锁失败,并监听比当前节点小的上一个节点,就会把由于session断连的那个节点剔除,不会影响锁的正常获取。

2、由于curator框架在操作zookpeer时,发送命令和处理监听响应是两个线程,所以可能出现当前节点创建的命令已经发出,并在zookpeer服务器上成功创建该节点,但响应线程由于网络问题,没有收到,由于curator的重试机制,就会再次发起创建命令,就会重复创建加锁节点。但curator会利用protectioned模式来解决这一问题,就是在所创建节点的名字前面加上UUID作为前缀,重试操作时,会先判断有没有UUID为前缀的节点,有了就不会重复创建。

其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-07 12:09:17  更:2021-08-07 12:11:31 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 19:23:37-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码