IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 分布式锁——基于Zookeeper -> 正文阅读

[大数据]分布式锁——基于Zookeeper

目录

一、分布式锁

二、Zookeeper实现分布式锁的原理

三、代码


一、分布式锁

我们知道,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,使用synchronized关键字或者使用Lock锁,但是这些只是针对单个应用,也就是只能在同一个JVM生效。随着业务发展,单机应用已经不能满足我们的需要,我们需要集群,分布式,这个时候就需要考虑到分布式锁。

二、Zookeeper实现分布式锁的原理

  • 根据zk临时节点的唯一性,当多个请求同时创建相同的节点,只要谁能够创建成功 谁就能够获取到锁。
  • 在创建节点时,如果该节点已经被其他请求创建则进入等待。
  • 只要能够创建节点成功,就认为获取到了锁,则开始进入到正常业务逻辑操作,其他没有获取锁进行等待;
  • 正常业务逻辑流程执行完后,调用zk关闭连接方式释放锁,从而是其他的请求开始进入到获取锁的资源。

三、代码

package com.xiaojie.template;

/**
 * 分布式锁
 */
public abstract class DistributeLock {

    public void getLock() {
        //获取锁
        if (tryLock()) {
            System.out.println(Thread.currentThread().getName() + "获取锁成功");
        } else {
            //等待锁
            waitLock();
            //重新获取
            getLock();
        }
    }

    /**
     * 等待锁
     */
    protected abstract void waitLock();

    /**
     * 尝试获取锁
     */
    protected abstract boolean tryLock();

    /**
     * 释放锁
     */
    public abstract void unLock();
}
package com.xiaojie.template;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.concurrent.CountDownLatch;

/**
 * 使用Zk获取分布式锁
 */
public class ZkDistributeLock extends DistributeLock {

    //参数1 连接地址
    private static final String ADDRESS = "192.168.139.154:2181,192.168.139.154:2182,192.168.139.154:2183";
    // 参数2 zk超时时间
    private static final int TIMEOUT = 5000;
    // 创建我们的zk连接
    private ZkClient zkClient = new ZkClient(ADDRESS, TIMEOUT);
    /**
     * 共同的创建临时节点
     */
    private String lockPath = "/myLock";
    private CountDownLatch countDownLatch = null;

    @Override
    protected void waitLock() {
/**
 * 使用事件通知,如果有节点删除,则重新开始竞争锁
 */
        IZkDataListener iZkDataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {

            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    //如果删除节点,countDownLatch变为0,开始竞争锁
                    countDownLatch.countDown();
                }
            }
        };
        zkClient.subscribeDataChanges(lockPath, iZkDataListener);
        // 2.使用countDownLatch等待
        if (countDownLatch == null) {
            countDownLatch = new CountDownLatch(1);
            System.out.println("开始等待锁。。。。。。。。。。");
        }
        try {
            // 如果当前计数器不是为0 就一直等待
            countDownLatch.await();
        } catch (Exception e) {

        }
        // 3. 如果当前节点被删除的情况下,需要重新进入到获取锁
        zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
    }

    /**
     * 尝试获取锁
     */
    @Override
    public boolean tryLock() {
        // 获取锁的思想:多个jvm同时创建临时节点,只要谁能够创建成功 谁能够获取到锁
        try {
            zkClient.createEphemeral(lockPath);
            return true;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * 释放锁。关闭临时节点,其他资源去竞争锁
     */
    @Override
    public void unLock() {
        if (zkClient != null) {
            zkClient.close();
            System.out.println(Thread.currentThread().getName() + ",释放了锁>>>");
        }
    }
}
package com.xiaojie.utils;

/**
 * 雪花算法生成id
 */
public class SnowFlake {

    /**
     * 起始的时间戳
     */
    private final static long START_STMP = 1480166465631L;

    /**
     * 每一部分占用的位数
     */
    private final static long SEQUENCE_BIT = 12; //序列号占用的位数
    private final static long MACHINE_BIT = 5;   //机器标识占用的位数
    private final static long DATACENTER_BIT = 5;//数据中心占用的位数

    /**
     * 每一部分的最大值
     */
    private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
    private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
    private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);

    /**
     * 每一部分向左的位移
     */
    private final static long MACHINE_LEFT = SEQUENCE_BIT;
    private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
    private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;

    private long datacenterId;  //数据中心
    private long machineId;     //机器标识
    private long sequence = 0L; //序列号
    private long lastStmp = -1L;//上一次时间戳

    public SnowFlake() {
    }

    public SnowFlake(long datacenterId, long machineId) {

        if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
            throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
        }
        if (machineId > MAX_MACHINE_NUM || machineId < 0) {
            throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
        }
        this.datacenterId = datacenterId;
        this.machineId = machineId;
    }

    /**
     * 产生下一个ID
     *
     * @return
     */
    public long nextId() {
        long currStmp = getNewstmp();
        if (currStmp < lastStmp) {
            throw new RuntimeException("Clock moved backwards.  Refusing to generate id");
        }
        if (currStmp == lastStmp) {
            //相同毫秒内,序列号自增
            sequence = (sequence + 1) & MAX_SEQUENCE;
            //同一毫秒的序列数已经达到最大
            if (sequence == 0L) {
                currStmp = getNextMill();
            }
        } else {
            //不同毫秒内,序列号置为0
            sequence = 0L;
        }
        lastStmp = currStmp;
        return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
                | datacenterId << DATACENTER_LEFT       //数据中心部分
                | machineId << MACHINE_LEFT             //机器标识部分
                | sequence;                             //序列号部分
    }

    private long getNextMill() {
        long mill = getNewstmp();
        while (mill <= lastStmp) {
            mill = getNewstmp();
        }
        return mill;
    }

    private long getNewstmp() {
        return System.currentTimeMillis();
    }

}
package com.xiaojie.utils;

import com.xiaojie.template.DistributeLock;
import com.xiaojie.template.ZkDistributeLock;

public class IdGenerate implements Runnable {
    private DistributeLock distributeLock = new ZkDistributeLock();

    private SnowFlake snowFlake = new SnowFlake();

    @Override
    public void run() {
        String s = genId();
    }

    public String genId() {
        try {
            distributeLock.getLock();
            //获取到锁执行业务,获取不到等待
            String id = String.valueOf(snowFlake.nextId());
            System.out.println(">>>>>>>>>>>" + id);
            return id;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            distributeLock.unLock();
        }
        return null;
    }

}
public class Test {
    public static void main(String[] args) {
//        IdGenerate idGenerate = new IdGenerate();
        for (int i = 0; i < 100; i++) {
            new Thread(new IdGenerate()).start();
        }
    }
}

参考:https://blog.csdn.net/weixin_44455476/article/details/105397576

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:08:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 15:57:29-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码