IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁 -> 正文阅读

[大数据]ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁

作者:Grey

原文地址: ZooKeeper学习笔记四:使用ZooKeeper实现一个简单的分布式锁

前置知识

完成ZooKeeper集群搭建以及熟悉ZooKeeperAPI基本使用

需求

当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

在单机情况下,可以使用JUC包里面的工具来进行互斥控制。

但是在分布式系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机并发控制锁策略失效,为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁的由来。

当多个进程不在同一个系统中,就需要用分布式锁控制多个进程对资源的访问。

我们可以用ZooKeeper来模拟实现一个简单的分布式锁

环境准备

一个zk集权,ip和端口分别为:

  • 192.168.205.145:2181
  • 192.168.205.146:2181
  • 192.168.205.147:2181
  • 192.168.205.148:2181

定义主方法

App.java

public class App {
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                ZkLock lock = new ZkLock();
                lock.lock(); // 开启锁
                System.out.println(Thread.currentThread().getName() + " doing work");
                lock.release(); // 释放锁
            }).start();
        }
        while (true) {
        }
    }
}

如上,我们设计了一个ZkLock,其中lock方法是锁定资源,release方法是释放资源,我们并发了10个线程并发访问来模拟。

public class ZkLock implements AsyncCallback.StringCallback, Watcher, AsyncCallback.StatCallback, AsyncCallback.Children2Callback {
    private CountDownLatch latch;
    private ZooKeeper zk;
    private String identify;
    private String lockPath;
    private String pathName;

    public ZkLock() {
        identify = Thread.currentThread().getName();
        lockPath = "/lock";
        latch = new CountDownLatch(1);
        zk = ZookeeperConfig.create(ADDRESS + "/testLock");
    }

    public void lock() {
        try {
            zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void release() {
        try {
            zk.delete(pathName, -1);
            System.out.println(identify + " over work....");
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }


    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if (null != name) {
            // 创建成功
            System.out.println(identify + " created " + name);
            pathName = name;
            zk.getChildren("/", false, this, "dasdfas");
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

        sort(children);
        int i = children.indexOf(pathName.substring(1));
        if (i == 0) {
            // 是第一个,获得锁,可以执行
            System.out.println(identify + " first...");
            try {
                zk.setData("/", identify.getBytes(UTF_8), -1);
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
        } else {
            zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
        }

    }


    @Override
    public void process(WatchedEvent event) {
        switch (event.getType()) {
            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                zk.getChildren("/", false, this, "sdf");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
        }
    }

    @Override
    public void processResult(int rc, String path, Object ctx, Stat stat) {

    }
}

关于上述代码的说明,我们规定创建的zk目录为/testLock,所以我们可以通过zk客户端在集群中先把/testLock目录建好,后续线程争抢的时候,我们只需要创建序列化的临时节点(以/lock开头),因为是序列化的,所以我们可以设置让第一个创建好节点的线程抢到锁,其他的线程排队等待。

所以lock方法实现如下:

zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());

lock方法在执行的时候,会有一个回调,即:当节点创建成功后,会判断/testLock节点中有没有已经创建好的且在当前节点之前的节点,有的话,则注册一个一个对于/testLock目录的监听:

    @Override
    public void processResult(int rc, String path, Object ctx, String name) {
        if (null != name) {
            // 创建成功
            System.out.println(identify + " created " + name);
            pathName = name;
            zk.getChildren("/", false, this, "dasdfas");
        }
    }

一旦发现/testLock目录下已经有节点了,那么我们拿到/testLock下的所有节点,并排序,取最小的那个节点执行即可:

  @Override
    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {

        sort(children);
        int i = children.indexOf(pathName.substring(1));
        if (i == 0) {
            // 是第一个,获得锁,可以执行
            System.out.println(identify + " first...");
            try {
                zk.setData("/", identify.getBytes(UTF_8), -1);
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
        } else {
            zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
        }

    }

release方法很简单,只需要把当前执行完毕的节点删除即可:

    public void release() {
        try {
            zk.delete(pathName, -1);
            System.out.println(identify + " over work....");
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

运行效果

确保zk中有/testLock这个节点,如果没有,请先创建一个:

Run App.java

可以看到控制台输出:

Thread-5 created /lock0000000000
Thread-4 created /lock0000000001
Thread-1 created /lock0000000002
Thread-9 created /lock0000000003
Thread-6 created /lock0000000004
Thread-2 created /lock0000000005
Thread-3 created /lock0000000006
Thread-0 created /lock0000000007
Thread-8 created /lock0000000008
Thread-7 created /lock0000000009
Thread-5 first...
Thread-5 doing work
Thread-5 over work....
Thread-4 first...
Thread-4 doing work
Thread-4 over work....
Thread-1 first...
Thread-1 doing work
Thread-1 over work....
Thread-9 first...
Thread-9 doing work
Thread-9 over work....
Thread-6 first...
Thread-6 doing work
Thread-6 over work....
Thread-2 first...
Thread-2 doing work
Thread-2 over work....
Thread-3 first...
Thread-3 doing work
Thread-3 over work....
Thread-0 first...
Thread-0 doing work
Thread-0 over work....
Thread-8 first...
Thread-8 doing work
Thread-8 over work....
Thread-7 first...
Thread-7 doing work
Thread-7 over work....

完整代码

Github

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-10 12:28:04  更:2021-11-10 12:29:32 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 0:22:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码