一、背景
??我在之前的文章SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理详细介绍了它的使用及其原理,现在我们也根据这个思路,用zookeeper原生的方式来实现一个分布式锁,加深对分布式锁的理解。本文中Spring Boot的版本是2.5.2,zookeeper的版本是3.6.3。
??我们大致的大致的流程图如下图,可作为我们查看代码的一个思路,不然看的头大。(当然本图是没有包含可重入锁的流程判断在里面的)
二、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<groupId>com.alian</groupId>
<artifactId>zklock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zklock</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
三、配置
3.1、application.yml配置
application.yml
server:
port: 8082
servlet:
context-path: /zklock
app:
zookeeper:
server: 10.130.3.16:2181
session-timeout: 15000
root-lock-path: /root/alian
3.2、属性配置类
??此配置类不懂的可以参考我另一篇文章:Spring Boot读取配置文件常用方式
AppProperties.java
package com.alian.zklock.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "app.zookeeper")
public class AppProperties {
private String server;
private int sessionTimeout;
private String rootLockPath;
}
3.3、ZookeeperConfig配置件
ZookeeperConfig.java
package com.alian.zklock.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.CountDownLatch;
@Slf4j
@Configuration
public class ZookeeperConfig {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
@Autowired
private AppProperties appProperties;
@Bean
public ZooKeeper zookeeper() throws Exception {
ZooKeeper zookeeper = new ZooKeeper(appProperties.getServer(), appProperties.getSessionTimeout(), event -> {
log.info("Receive watched event: {}", event.getState());
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
if (KeeperState.SyncConnected == keeperState) {
if (EventType.None == eventType) {
countDownLatch.countDown();
log.info("zookeeper建立连接");
}
}
});
countDownLatch.await();
return zookeeper;
}
}
??这里主要是对ZooKeeper 进行连接配置,关于CountDownLatch的使用,本文最后有相关的介绍。
四、实战
??定义了两个方法:加锁和释放锁。
4.1、接口
ILockService.java
package com.alian.zklock.service;
import java.util.concurrent.TimeUnit;
public interface ILockService {
boolean lock(String lockPath, long time, TimeUnit unit);
void release();
}
4.2、接口核心实现
??这个实现类的注释,我想已经很详细了。可以细细阅读,可以加深你对zookeeper分布式锁实现原理的理解。
ZookeeperLockService.java
package com.alian.zklock.service.impl;
import com.alian.zklock.service.ILockService;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
public class ZookeeperLockService implements ILockService {
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
@Autowired
private ZooKeeper zooKeeper;
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
public boolean lock(String lockPath, long time, TimeUnit unit) {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
lockData.lockCount.incrementAndGet();
return true;
}
String currentLockPath = attemptLock(lockPath, time, unit);
if (StringUtils.isNotBlank(currentLockPath)) {
LockData newLockData = new LockData(currentThread, currentLockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
public String attemptLock(String lockPath, long time, TimeUnit unit) {
try {
String currentLockPath = zooKeeper.create(lockPath + "/", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
log.info("线程:【{}】->【{}】尝试竞争锁", Thread.currentThread().getName(), currentLockPath);
if (StringUtils.isBlank(currentLockPath)) {
throw new Exception("生成临时节点异常");
}
boolean hasLock = checkLocked(lockPath, currentLockPath, time, unit);
return hasLock ? currentLockPath : null;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public boolean checkLocked(String lockPath, String currentLockPath, long time, TimeUnit unit) {
boolean hasLock = false;
boolean toDelete = false;
try {
while (!hasLock) {
Pair<Boolean, String> pair = getsTheLock(lockPath, currentLockPath);
boolean currentLock = pair.getLeft();
String preSequencePath = pair.getRight();
if (currentLock) {
hasLock = true;
} else {
final CountDownLatch latch = new CountDownLatch(1);
Watcher watcher = watchedEvent -> {
log.info("监听到的变化【】 watchedEvent = {}", watchedEvent);
latch.countDown();
};
Stat stat = zooKeeper.exists(preSequencePath, watcher);
if (stat != null) {
log.info("线程:【{}】等待锁【{}】释放", Thread.currentThread().getName(), preSequencePath);
boolean await = latch.await(time, unit);
if (!await) {
log.info("获取锁超时");
toDelete = true;
break;
}
}
Pair<Boolean, String> checkPair = getsTheLock(lockPath, currentLockPath);
if (checkPair.getLeft()) {
hasLock = true;
}
}
}
} catch (Exception e) {
log.error("检查是否获取到锁异常", e);
if (e instanceof InterruptedException) {
toDelete = true;
}
} finally {
if (toDelete) {
deleteCurrentPath(currentLockPath);
}
}
return hasLock;
}
private Pair<Boolean, String> getsTheLock(String lockPath, String currentLock) throws Exception {
List<String> childrenList = zooKeeper.getChildren(lockPath, false);
Collections.sort(childrenList);
String currentLockNode = currentLock.substring(currentLock.lastIndexOf("/") + 1);
if (currentLockNode.equals(childrenList.get(0))) {
log.info("节点【{}】成功的获取分布式锁", currentLock);
return Pair.of(true, "");
}
int index = Collections.binarySearch(childrenList, currentLockNode);
if (index < 0) {
throw new Exception("节点没有找到: " + currentLockNode);
}
String preSequencePath = lockPath + "/" + childrenList.get(index - 1);
return Pair.of(false, preSequencePath);
}
private void deleteCurrentPath(String currentLockPath) {
try {
Stat stat = zooKeeper.exists(currentLockPath, false);
if (stat != null) {
zooKeeper.delete(currentLockPath, -1);
}
} catch (InterruptedException | KeeperException e) {
log.error("删除节点异常");
}
}
@Override
public void release() {
Thread currentThread = Thread.currentThread();
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: ");
}
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: ");
}
try {
zooKeeper.delete(lockData.lockPath, -1);
log.info("线程:【{}】释放锁【{}】", Thread.currentThread().getName(), lockData.lockPath);
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
} finally {
threadData.remove(currentThread);
}
}
}
4.3、测试类
??我们为了方便检验我们的分布式锁,初始化库存为100,就使用3个线程进行并发,每个线程减55个库存,我这里也不使用测试工具jmeter了,就相当于单机测试了。(如果是要进行分布式部署测试,那么库存值不能像我这样直接在程序写死 ,可以放redis或者数据库,然后通过负载均衡、压力测试工具jmeter去完成,具体使用可以参考:windows下Nginx配置及负载均衡使用),我们主要目的是:为了验证我们写的分布式锁,加深对分布式锁的理解。
TestLockService.java
package com.alian.zklock.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Service
public class TestLockService {
@Autowired
private ILockService lockService;
AtomicInteger stock = new AtomicInteger(100);
@PostConstruct
public void testLock() {
final CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
countDownLatch.await();
boolean lock = lockService.lock("/root/alian", 10, TimeUnit.SECONDS);
if (lock) {
Thread.sleep(100);
decrement();
lockService.release();
log.info("线程【{}】扣减完,剩余库存:{}", Thread.currentThread().getName(), stock.get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
}
}, "Thread" + i).start();
countDownLatch.countDown();
}
}
private void decrement() {
for (int i = 0; i < 5; i++) {
stock.decrementAndGet();
}
}
}
4.4、结果
运行结果图: 从我们的结果图可以看出来(为了方便,节点前面的变化文章里就省略了,实际是存在的):
- 同时三个线程(Thread0、Thread1、Thread2)创建了节点(180、179,181)去抢占资源
- Thread1创建的179号节点是最小的,获取到了锁,这时候,Thread0监听179节点,Thread2监听180节点
- Thread1扣减库存5次,然后释放锁,也就是删除了节点179,触发监听
- 因为Thread0监听179节点,所以Thread0继续执行抢占到了锁,同样扣减库存后,删除180节点
- 然后Thread2监听的是180节点,同样的Thread2抢占到了锁,扣减库存,删除181节点
- 最后得到库存85
超时的验证则可以在业务执行的时候设置一个休眠时间,可重入锁也是支持的,直接使用curator里面的,优秀的东西就直接拿来用了
4.5、关于CountDownLatch
也许有很多小伙伴,不知道CountDownLatch是怎么用的,我这里就简单介绍下,主要有两个方法:
递减锁存器的计数,如果计数到达零,则释放所有等待的线程,如果当前计数大于零,则将计数减少。
- public boolean await(long timeout,TimeUnit unit) throws InterruptedException
??使当前线程在锁存器倒计数至0之前一直等待,除非线程被中断或超出了指定的等待时间。如果计数到达零,则返回true;如果在计数到达零之前超过了等待时间,则返回false,以下三种情况之一前,该线程将一直出于休眠状态:
- 如果计数到达零,则该方法返回true值
- 如果超出了指定的等待时间,则返回值为false。如果该时间小于等于零,则该方法根本不会等待
- 如果当前线程,在进入此方法时已经设置了该线程的中断状态;或者在等待时被中断,则抛出InterruptedException,并且清除当前线程的已中断状态
??类似本文中的测试方法,就相当于并发,当三个线程都创建完,都走到countDownLatch.await()这里就不执行了,直到执行countDownLatch.countDown()后面才会走。
public void race() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
countDownLatch.await();
Thread.sleep(100);
log.info(Thread.currentThread().getName()+"开始跑步");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread" + i).start();
}
countDownLatch.countDown();
log.info("主线程执行完");
}
结果:
2021-10-26 20:43:06 458 [main] INFO:主线程执行完
2021-10-26 20:43:06 561 [Thread2] INFO:Thread2开始跑步
2021-10-26 20:43:06 561 [Thread0] INFO:Thread0开始跑步
2021-10-26 20:43:06 561 [Thread1] INFO:Thread1开始跑步
??我们也可以反过来,使主线程阻塞,这个时候就是线程执行到countDownLatch.await()后,主线程后面的不执行,直到前面的子线程都执行完,主线程才往后执行。
public void multitasking() throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
log.info(Thread.currentThread().getName()+"执行完");
countDownLatch.countDown();
}, "Thread" + i).start();
}
countDownLatch.await();
log.info("主线程执行完");
}
结果:
2021-10-26 20:45:21 053 [Thread0] INFO:Thread0执行完
2021-10-26 20:45:21 053 [Thread1] INFO:Thread1执行完
2021-10-26 20:45:21 053 [Thread2] INFO:Thread2执行完
2021-10-26 20:45:21 053 [main] INFO:主线程执行完
结语
??也许本文的写的分布式还有些许的瑕疵,但我们主要目的是:为了加深对zookeeper分布式锁实现原理的理解,实际使用中我们还是使用curator是比较方便和稳定,具体可以参考我另外一篇文章:SpringBoot基于Zookeeper和Curator实现分布式锁并分析其原理。
|