目录
一、分布式锁
二、Zookeeper实现分布式锁的原理
三、代码
一、分布式锁
我们知道,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,使用synchronized关键字或者使用Lock锁,但是这些只是针对单个应用,也就是只能在同一个JVM生效。随着业务发展,单机应用已经不能满足我们的需要,我们需要集群,分布式,这个时候就需要考虑到分布式锁。
二、Zookeeper实现分布式锁的原理
- 根据zk临时节点的唯一性,当多个请求同时创建相同的节点,只要谁能够创建成功 谁就能够获取到锁。
- 在创建节点时,如果该节点已经被其他请求创建则进入等待。
- 只要能够创建节点成功,就认为获取到了锁,则开始进入到正常业务逻辑操作,其他没有获取锁进行等待;
- 正常业务逻辑流程执行完后,调用zk关闭连接方式释放锁,从而是其他的请求开始进入到获取锁的资源。
三、代码
package com.xiaojie.template;
/**
* 分布式锁
*/
public abstract class DistributeLock {
public void getLock() {
//获取锁
if (tryLock()) {
System.out.println(Thread.currentThread().getName() + "获取锁成功");
} else {
//等待锁
waitLock();
//重新获取
getLock();
}
}
/**
* 等待锁
*/
protected abstract void waitLock();
/**
* 尝试获取锁
*/
protected abstract boolean tryLock();
/**
* 释放锁
*/
public abstract void unLock();
}
package com.xiaojie.template;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.concurrent.CountDownLatch;
/**
* 使用Zk获取分布式锁
*/
public class ZkDistributeLock extends DistributeLock {
//参数1 连接地址
private static final String ADDRESS = "192.168.139.154:2181,192.168.139.154:2182,192.168.139.154:2183";
// 参数2 zk超时时间
private static final int TIMEOUT = 5000;
// 创建我们的zk连接
private ZkClient zkClient = new ZkClient(ADDRESS, TIMEOUT);
/**
* 共同的创建临时节点
*/
private String lockPath = "/myLock";
private CountDownLatch countDownLatch = null;
@Override
protected void waitLock() {
/**
* 使用事件通知,如果有节点删除,则重新开始竞争锁
*/
IZkDataListener iZkDataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (countDownLatch != null) {
//如果删除节点,countDownLatch变为0,开始竞争锁
countDownLatch.countDown();
}
}
};
zkClient.subscribeDataChanges(lockPath, iZkDataListener);
// 2.使用countDownLatch等待
if (countDownLatch == null) {
countDownLatch = new CountDownLatch(1);
System.out.println("开始等待锁。。。。。。。。。。");
}
try {
// 如果当前计数器不是为0 就一直等待
countDownLatch.await();
} catch (Exception e) {
}
// 3. 如果当前节点被删除的情况下,需要重新进入到获取锁
zkClient.unsubscribeDataChanges(lockPath, iZkDataListener);
}
/**
* 尝试获取锁
*/
@Override
public boolean tryLock() {
// 获取锁的思想:多个jvm同时创建临时节点,只要谁能够创建成功 谁能够获取到锁
try {
zkClient.createEphemeral(lockPath);
return true;
} catch (Exception e) {
return false;
}
}
/**
* 释放锁。关闭临时节点,其他资源去竞争锁
*/
@Override
public void unLock() {
if (zkClient != null) {
zkClient.close();
System.out.println(Thread.currentThread().getName() + ",释放了锁>>>");
}
}
}
package com.xiaojie.utils;
/**
* 雪花算法生成id
*/
public class SnowFlake {
/**
* 起始的时间戳
*/
private final static long START_STMP = 1480166465631L;
/**
* 每一部分占用的位数
*/
private final static long SEQUENCE_BIT = 12; //序列号占用的位数
private final static long MACHINE_BIT = 5; //机器标识占用的位数
private final static long DATACENTER_BIT = 5;//数据中心占用的位数
/**
* 每一部分的最大值
*/
private final static long MAX_DATACENTER_NUM = -1L ^ (-1L << DATACENTER_BIT);
private final static long MAX_MACHINE_NUM = -1L ^ (-1L << MACHINE_BIT);
private final static long MAX_SEQUENCE = -1L ^ (-1L << SEQUENCE_BIT);
/**
* 每一部分向左的位移
*/
private final static long MACHINE_LEFT = SEQUENCE_BIT;
private final static long DATACENTER_LEFT = SEQUENCE_BIT + MACHINE_BIT;
private final static long TIMESTMP_LEFT = DATACENTER_LEFT + DATACENTER_BIT;
private long datacenterId; //数据中心
private long machineId; //机器标识
private long sequence = 0L; //序列号
private long lastStmp = -1L;//上一次时间戳
public SnowFlake() {
}
public SnowFlake(long datacenterId, long machineId) {
if (datacenterId > MAX_DATACENTER_NUM || datacenterId < 0) {
throw new IllegalArgumentException("datacenterId can't be greater than MAX_DATACENTER_NUM or less than 0");
}
if (machineId > MAX_MACHINE_NUM || machineId < 0) {
throw new IllegalArgumentException("machineId can't be greater than MAX_MACHINE_NUM or less than 0");
}
this.datacenterId = datacenterId;
this.machineId = machineId;
}
/**
* 产生下一个ID
*
* @return
*/
public long nextId() {
long currStmp = getNewstmp();
if (currStmp < lastStmp) {
throw new RuntimeException("Clock moved backwards. Refusing to generate id");
}
if (currStmp == lastStmp) {
//相同毫秒内,序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
//同一毫秒的序列数已经达到最大
if (sequence == 0L) {
currStmp = getNextMill();
}
} else {
//不同毫秒内,序列号置为0
sequence = 0L;
}
lastStmp = currStmp;
return (currStmp - START_STMP) << TIMESTMP_LEFT //时间戳部分
| datacenterId << DATACENTER_LEFT //数据中心部分
| machineId << MACHINE_LEFT //机器标识部分
| sequence; //序列号部分
}
private long getNextMill() {
long mill = getNewstmp();
while (mill <= lastStmp) {
mill = getNewstmp();
}
return mill;
}
private long getNewstmp() {
return System.currentTimeMillis();
}
}
package com.xiaojie.utils;
import com.xiaojie.template.DistributeLock;
import com.xiaojie.template.ZkDistributeLock;
public class IdGenerate implements Runnable {
private DistributeLock distributeLock = new ZkDistributeLock();
private SnowFlake snowFlake = new SnowFlake();
@Override
public void run() {
String s = genId();
}
public String genId() {
try {
distributeLock.getLock();
//获取到锁执行业务,获取不到等待
String id = String.valueOf(snowFlake.nextId());
System.out.println(">>>>>>>>>>>" + id);
return id;
} catch (Exception e) {
e.printStackTrace();
} finally {
distributeLock.unLock();
}
return null;
}
}
public class Test {
public static void main(String[] args) {
// IdGenerate idGenerate = new IdGenerate();
for (int i = 0; i < 100; i++) {
new Thread(new IdGenerate()).start();
}
}
}
参考:https://blog.csdn.net/weixin_44455476/article/details/105397576
|