IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> zookeeper 实现分布式锁 -> 正文阅读

[大数据]zookeeper 实现分布式锁

1、 自定义锁工具类

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

/**
 * @Description:
 * @Author: tanjunjun
 * @CreateDate: 2021/7/16 10:28
 * @Version: 1.0
 * @menu
 */
public class DistributeLock {

    private final ZooKeeper zk;

    private CountDownLatch connectLatch = new CountDownLatch(1);

    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String watchPath;
    private String currentNode;

    public DistributeLock() throws IOException, InterruptedException, KeeperException {
        // 获取连接
        zk = new ZooKeeper("127.0.0.1:2181", 2000, watchedEvent -> {
                // 如果连接上zk connectLatch释放
                if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){
                    connectLatch.countDown();
                }
                // waitLatch 释放 如果是删除节点,且是上一个节点,就释放调
                if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(watchPath)) {
                    waitLatch.countDown();
                }

        });

        // 等待连接后,再往下走
        connectLatch.await();

        // 判断根节点/locks是否存在
        Stat exists = zk.exists("/locks", false);
        if (exists == null) {
            zk.create("/locks","locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    // 对zk加锁
    public void zkLock() throws KeeperException, InterruptedException {
        // 创建对应的临时带序号的节点
        currentNode = zk.create("/locks/" + "seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        // 判断自己是不是最小序号节点,如果不是,监听上一个节点
        List<String> children = zk.getChildren("/locks", false);
        // 如果只有一个节点,说明就是当前节点
        if (children.size() == 1){
            return;
        }else {
            // 取出前一个节点
            Collections.sort(children);
            // 获取节点名称
            String thisNode = currentNode.substring("/locks/".length());
            // 获取在集合中的位置
            int index = children.indexOf(thisNode);
            if (index == -1) {
                System.out.println("数据异常");
            } else if (index == 0) {
                // 就一个节点,可以直接获取
                return;
            } else {
                // 需要获取前一个节点变化
                watchPath = "/locks/"+ children.get(index -1);
                zk.getData(watchPath,true,null);

                // 等待
                waitLatch.await();
            }

        }
    }

    // 解锁
    public void unZkLock(){
        // 删除节点
        try {
            zk.delete(currentNode,-1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}
  1. 测试自定义
public class DistributeLockTest {

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
          final   DistributeLock lock1 = new DistributeLock();
          final   DistributeLock lock2 = new DistributeLock();

        new Thread(() -> {
            try {
                lock1.zkLock();
                System.out.println("线程1 启动获取到锁!");
                test(1);
                Thread.sleep(5*1000);
                lock1.unZkLock();
                System.out.println("线程1 释放锁!");
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                lock2.zkLock();
                System.out.println("线程2 启动获取到锁!");
                test(2);
                Thread.sleep(5*1000);
                lock2.unZkLock();
                System.out.println("线程2 释放锁!");
            } catch (KeeperException | InterruptedException e) {
                e.printStackTrace();
            }
        }).start();


    }


    public static void test(Integer num){
        System.out.println(num);
    }
}
  1. 使用Curator 框架
public class CuratorTest {
    public static void main(String[] args) {
        // 创建分布式锁一
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");

        // 创建分布式锁二
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");

        new Thread(() -> {
            try {
                lock1.acquire();
                System.out.println("lock1 获取锁");

                Thread.sleep(5000);

                lock1.release();
                System.out.println("lock1 释放锁");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                lock2.acquire();
                System.out.println("lock2 获取锁");

                Thread.sleep(5000);

                lock2.release();
                System.out.println("lock2 释放锁");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    private static CuratorFramework getCuratorFramework() {
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000, 3);
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString("127.0.1.1:2181")
                .connectionTimeoutMs(2000)
                .retryPolicy(exponentialBackoffRetry)
                .build();
        build.start();
        System.out.println("zk 连接成功");
        return build;
    }


}

  1. pom.xml
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.9</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.16</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>

        <!--   curator     -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.3.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>4.3.0</version>
        </dependency>
    </dependencies>
  1. log4sj
### 设置###
log4j.rootLogger = debug,stdout,D,E

### 输出信息到控制抬 ###
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n

### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = E://logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG 
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n

### 输出ERROR 级别以上的日志到=E://logs/error.log ###
log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
log4j.appender.E.File =E://logs/error.log 
log4j.appender.E.Append = true
log4j.appender.E.Threshold = ERROR 
log4j.appender.E.layout = org.apache.log4j.PatternLayout
log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-17 11:59:21  更:2021-07-17 12:01:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年4日历 -2024/4/24 17:43:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码