IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 四 Zookeeper分布式锁机制及其Java API 原生实现 Curator框架实现 -> 正文阅读

[大数据]四 Zookeeper分布式锁机制及其Java API 原生实现 Curator框架实现

四, Zookeeper 分布式锁机制和代码实现

4.1 Zookeeper 分布式锁机制

  • 分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的一致性。

[为什么Zookeeper可以实现分布式锁?]

在这里插入图片描述
在这里插入图片描述

[分布式锁实现原理]

在这里插入图片描述

对实现过程的解读:

  1. zookeeper会在它的集群内维护一个永久根节点, 我们可以命名为/locks, 这个根节点的每个子节点维护着每台客户端向zookeeper申请的锁;
  2. 代表这个锁的节点,是一个临时节点并且带有序号, 客户端每次申请锁都要先创建一个这样的节点;
  3. 当客户端A向zookeeper申请锁时, 先在/locks下面创建一个临时带序号节点, 然后判断这个节点是否是/locks子节点集合(这个集合必须先排序)中的第一个节点, 如果是的话就直接得到锁.
  4. 当客户端B向zookeeper申请锁时, 也要在/locks下面创建一个临时带序号节点, 然后判断判断这个节点是否是/locks子节点集合(这个集合必须先排序)中的第一个节点, 如果不是, 就获取到这个节点的前一个节点(子节点集合中这个节点的前一个节点), 对这个前一个节点设置监听器, 等监听到这个前一个节点被删除了, 那么代表客户端B的这个节点就可以获得锁了.
  5. 再多的客户端都是如此判断.

其实如果有客户端C、客户端D等N个客户端争抢一个zk分布式锁,原理都是类似的。

  • 大家都是上来直接创建一个锁节点下的子节点
  • 如果自己不是第一个节点,就对自己上一个节点加监听器只要上一个节点释放锁(节点被删除了),自己就排到前面去了,相当于是一个排队机制。
  • 而且用临时顺序节点的另外一个用意就是,如果某个客户端创建临时顺序节点之后,不小心自己宕机了也没关系,zk感知到那个客户端宕机,会自动删除对应的临时顺序节点,相当于自动释放锁,或者是自动取消自己的排队。

详细介绍:ZooKeeper分布式锁的实现原理

4.2 使用Java API 原生实现Zookeeper 分布式锁

不跟你多bb, 直接上代码

  • 对zookeeper连接(包括连接, 以及对连接事件, 上一个节点删除事件的监听), 加锁, 解锁的实现
  • DistributeLock.java
package cn.qsc.zkcase2;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class DistributeLock {

    private final String connectString = "bigdata01:2181,bigdata02:2181,bigdata03:2181";
    private final int sessiontimeout = 2000;
    private final ZooKeeper zk;

    //ZooKeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);
    //ZooKeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);

    //当前节点的上一个节点
    private String preNodePath;
    private String currentNode;

    //创建节点的路径
    private String rootNode = "locks";
    private String subNode = "seq-";

    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        //1. 与zk服务器建立连接
        zk = new ZooKeeper(connectString, sessiontimeout, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //总共有两次监听事件需要处理
                //1. 建立连接之后
                // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
                if  (watchedEvent.getState()  == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // 发生了 preNodePath 的删除事件
                if  (watchedEvent.getType()  == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(preNodePath))
                {
                    waitLatch.countDown();
                }
            }

        });

        //确保与zk建立连接执行完成后, 才继续往下执行, 提高代码的健壮性
        connectLatch.await();

        //2. 确保根节点是否存在
        Stat stat = zk.exists("/"+rootNode, false);

        if( stat == null){
            zk.create("/"+rootNode, rootNode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }


    //加锁(获取根节点的一个子节点)
    public void zkLock(){

        //1. 创建对应的临时带序号节点
        try {
            currentNode = zk.create("/locks"+"/seqp-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("创建了当前节点currentNode: "+ currentNode);
            Thread.sleep(10);  wait 一小会, 让结果更清晰一些
            //2. 判断.
            /首先获取locks的所有子节点并判断, 当前locks下的子节点是否只有一个, 是的话, 那肯定当前创建的节点就是唯一的节点, 直接获取锁即可
            如果不止一个, 则把所有的节点进行排序后, 获取当前节点的index是否为0
            List<String> children = zk.getChildren("/"+rootNode, false);
            System.out.println("创建后我就获取了子节点"+children);
            //locks 下面只有一个子节点, 直接获取锁即可
            if(children.size() == 1){
                return;
            }else{
                //locks 下面不止一个子节点, 排序然后判断当前节点的index值
                Collections.sort(children);
                System.out.println("当前所有节点为:"+children);
                // 易错点!!!!!!

                //当前节点的 名称
                String currentNodeName = currentNode.substring(("/"+rootNode+"/").length());

                ///当前节点名称即currentNodeName的index
                int index = children.indexOf(currentNodeName);


                System.out.println("当前节点:"+ currentNode +", 当前节点的名称: "+currentNodeName+", " +
                        " 当前节点的index为:" + index);
                if(index == -1 ){
                    System.out.println("数据有误");
                }else if (index == 0){
                    return;
                }else{
                    //当前节点前面还有节点, 对上一个节点设置监听器
                    preNodePath = ("/"+rootNode+"/")+children.get(index -1);
                    System.out.println("======前一个节点的地址: "+preNodePath);
                    System.out.println("currentNode为: "+ currentNode + "preNodePath="+preNodePath);
                    zk.getData(preNodePath, true, new Stat());

                    //等待监听完成(上一个节点被删除)
                    waitLatch.await();

                    return;
                }
            }



            //3. 如果不是最小节点, 对上一个节点设置监听器
            这个步骤要到zk连接方法中设置噢

        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }


    //解锁
    public void unzkLock(){
        //删除节点
        try {
            zk.delete(currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}


  • 两个线程对应于两个分布式锁对象, 去分别开启线程获得锁, 释放锁
  • DistributeLockTest.java
package cn.qsc.zkcase2;

import org.apache.zookeeper.KeeperException;

import java.io.IOException;

public class DistributeLockTest {
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributeLock lock1 = new DistributeLock();

        DistributeLock lock2 = new DistributeLock();


        //新建两个线程去获取锁
        new Thread(new Runnable() {
            @Override
            public void run() {

                try {
                    lock1.zkLock();
                    System.out.println("线程1获取到锁");
                    Thread.sleep(5000);

                    lock1.unzkLock();
                    System.out.println("线程1释放锁");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();


        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                lock2.zkLock();
                System.out.println("线程2获取到锁");

                    Thread.sleep(5000);

                    lock2.unzkLock();
                    System.out.println("线程2释放锁");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

    }
}

4.3 使用Curator框架实现Zookeeper分布式锁(待总结)

参见此文

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:42:40  更:2021-07-31 16:44:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/4 23:13:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码