1. 什么是分布式锁?
我们先来看这样一个场景,如下图所示,两个用户同时去抢购秒杀商品,当秒杀服务同时收到秒杀请求时,都去进行库存扣减,此时在没有做任何处理的情况下,就会导致库存数量变成负数从而导致超卖现象。
这种情况下如果是单体项目,我们一般会选择加锁的方式来避免并发的问题。但是在分布式场景中,采用传统的锁并不能解决跨进程并发的问题,所以需要引入一个分布式锁,来解决多个节点之间的访问控制。
2. Zookeeper如何实现分布式锁
实现分布式的方式有很多种,本文主要讲述如何使用zookeeper实现分布式锁。我们可以基于zookeeper的两种特性来实现分布式锁,首先我们来看第一种:
2.1 唯一节点特性
我们可以基于唯一节点特性来实现分布式锁的操作,如下图所示。多个应用程序去抢占锁资源时,只需要在指定节点上创建一个 /Lock 节点,由于Zookeeper中节点的唯一性特性,使得只会有一个用户成功创建 /Lock 节点,剩下没有创建成功的用户表示竞争锁失败。
这种方法虽然能达到目的,但是会有一个问题,如下图所示,假设有非常多的节点需要等待获得锁,那么等待的方式自然是使用watcher机制来监听/lock节点的删除事件,一旦发现该节点被删除说明之前获得锁的节点已经释放了锁,那么此时剩下的B、C、D节点会同时收到删除事件从而去竞争锁,这个过程会产生惊群效应。
什么是“惊群效应”呢?简单来说就是如果存在许多的客户端在等待获取锁,当成功获取到锁的进程释放该节点后,所有处于等待状态的客户端都会被唤醒,这个时候zookeeper会在短时间内发送大量子节点变更事件给所有待获取锁的客户端,然后实际情况是只会有一个客户端获得锁。如果在集群规模比较大的情况下,会对zookeeper服务器的性能产生比较的影响。
2.2 有序节点
为了解决惊群效应,我们可以采用Zookeeper的有序节点特性来实现分布式锁。
如下图所示,每个客户端都往指定的节点下注册一个临时有序节点,越早创建的节点,节点的顺序编号就越小,那么我们可以判断子节点中最小的节点设置为获得锁。如果自己的节点不是所有子节点中最小的,意味着还没有获得锁。这个的实现和前面单节点实现的差异性在于,每个节点只需要监听比自己小的节点,当比自己小的节点删除以后,客户端会收到watcher事件,此时再次判断自己的节点是不是所有子节点中最小的,如果是则获得锁,否则就不断重复这个过程,这样就不会导致羊群效应,因为每个客户端只需要监控一个节点。
使用有序节点实现分布式锁的流程大致如下:
3. Curator实现分布式锁
在日常开发种,我们无需自己去实现分布式锁,只需要使用Curator即可实现分布式锁。
为了实现分布式锁,我们先演示一个存在并发异常的场景。
3.1 商品抢购场景
SQL
DROP TABLE IF EXISTS `goods_stock`;
CREATE TABLE `goods_stock` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT,
`goods_no` int(11) NOT NULL COMMENT '商品编号',
`stock` int(11) NULL DEFAULT NULL COMMENT '库存',
`isActive` smallint(6) NULL DEFAULT NULL COMMENT '是否上架(1上,0不是)',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 3 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
整个项目采用spring boot+mybatis-plus框架,代码一键生成。主要编写controller层代码即可:
这个抢购接口乍一看好像没啥问题,但实际上是存在问题的,因为他不具有原子性,在高并发场景下会造成数据多扣减。
我们可以使用jmeter对这个接口进行压测,用1500个线程,库存数量设置成100,监视数据库中库存的变化发现,整个库存变化过程是非常混乱的。可能一会数字变小,但是一会又变大了…
@RestController
@RequestMapping("/goods-stock")
public class GoodsStockController {
@Autowired
private IGoodsStockService goodsStockService;
@GetMapping("/{goodsNo}")
public String purchase(@PathVariable("goodsNo") Integer goodsNo) throws Exception {
QueryWrapper<GoodsStock> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("goods_no", goodsNo);
GoodsStock goodsStock = goodsStockService.getOne(queryWrapper);
Thread.sleep(new Random().nextInt(1000)); //增加问题出现的频率
if (goodsStock == null) {
return "指定商品不存在";
}
if (goodsStock.getStock().intValue() < 1) {
return "库存不够";
}
goodsStock.setStock(goodsStock.getStock() - 1);
boolean res = goodsStockService.updateById(goodsStock);
if (res) {
return "抢购:" + goodsNo + "成功";
}
return "抢购失败";
}
}
3.2 引入Zookeeper实现分布式锁
针对以上问题,我们可以引入zookeeper。curator客户端对于锁这块做了一些封装,curator提供了InterProcessMutex 这样一个api。除了分布式锁之外,还提供了leader选举、分布式队列等常用的功能。
-
InterProcessMutex:分布式可重入排它锁 -
InterProcessSemaphoreMutex:分布式排它锁 -
InterProcessReadWriteLock:分布式读写锁
具体的使用方法如下:
-
引入pom <dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
-
CuratorConfig @Configuration
public class CuratorConfig {
@Bean
public CuratorFramework curatorFramework() {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("localhost:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(20000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorFramework.start();
return curatorFramework;
}
}
-
修改接口,增加锁机制 @Scope(scopeName = "prototype")
@RestController
@RequestMapping("/goods-stock")
public class GoodsStockController {
@Autowired
private IGoodsStockService goodsStockService;
@Autowired
private CuratorFramework curatorFramework;
@GetMapping("/{goodsNo}")
public String purchase(@PathVariable("goodsNo") Integer goodsNo) throws Exception {
QueryWrapper<GoodsStock> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("goods_no", goodsNo);
InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/Locks");
try {
lock.acquire();
GoodsStock goodsStock = goodsStockService.getOne(queryWrapper);
Thread.sleep(new Random().nextInt(1000));
if (goodsStock == null) {
return "指定商品不存在";
}
if (goodsStock.getStock().intValue() < 1) {
return "库存不够";
}
goodsStock.setStock(goodsStock.getStock() - 1);
boolean res = goodsStockService.updateById(goodsStock);
if (res) {
return "抢购书籍:" + goodsNo + "成功";
}
} finally {
lock.release();
}
return "抢购失败";
}
}
-
修改完成后,我们继续通过jmeter压测,可以看到就不存在库存超卖问题了。
4. Curator实现分布式锁的源码分析
前面我们已经理解的Zookeeper实现分布式锁的原理,以及基于Curator完成了分布式锁的使用,那么我们继续来分析Curator是如何基于代码实现这一过程。
4.1 抢占锁的逻辑
Curator构造函数
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
this.threadData = Maps.newConcurrentMap();
// maxLeases=1,表示可以获得分布式锁的线程数量(跨JVM)为1,即为互斥锁
// 锁节点的名称前缀,lock-0000001, 后面部分是有序递增的序列号
this.basePath = PathUtils.validatePath(path);
// internals的类型为LockInternals,InterProcessMutex将分布式锁的申请和释放操作委托 给internals执行
this.internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
acquire方法
调用acquire方法,该方法有两个重载方法,另外一个是带超时时间,当等待超时没有获得锁则放弃锁的占用。
public void acquire() throws Exception {
if (!this.internalLock(-1L, (TimeUnit)null)) {
throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);
}
}
internalLock
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//得到当前线程
Thread currentThread = Thread.currentThread();
//使用threadData存储线程重入的情况
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData != null) {
//同一线程再次acquire,首先判断当前的映射表内(threadData)是否有该线程的锁信息,如果有则原子+1,然后返回
lockData.lockCount.incrementAndGet();
return true;
} else {
// 映射表内没有对应的锁信息,尝试通过LockInternals获取锁
String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());
if (lockPath != null) {
// 成功获取锁,记录信息到映射表
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
this.threadData.put(currentThread, newLockData);
return true;
} else {
return false;
}
}
}
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount;
private LockData(Thread owningThread, String lockPath) {
this.lockCount = new AtomicInteger(1);
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
attemptLock
尝试获得锁,实际上是向zookeeper注册一个临时有序节点,并且判断当前创建的节点的顺序是否是最小节点。如果是则表示获得锁成功:
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
long startMillis = System.currentTimeMillis();
Long millisToWait = unit != null ? unit.toMillis(time) : null;
byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while(!isDone) {
isDone = true;
try {
ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);
hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);
} catch (NoNodeException var14) {
if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw var14;
}
isDone = false;
}
}
return hasTheLock ? ourPath : null;
}
createsTheLock
在Zookeeper中创建临时顺序节点.
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
if (lockNodeBytes != null) {
ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);
} else {
ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);
}
return ourPath;
}
internalLockLoop
循环等待来激活分布式锁,实现锁的公平性
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
try {
if (this.revocable.get() != null) {
((BackgroundPathable)this.client.getData().usingWatcher(this.revocableWatcher)).forPath(ourPath);
}
while(this.client.getState() == CuratorFrameworkState.STARTED && !haveTheLock) {
List<String> children = this.getSortedChildren();
String sequenceNodeName = ourPath.substring(this.basePath.length() + 1);
PredicateResults predicateResults = this.driver.getsTheLock(this.client, children, sequenceNodeName, this.maxLeases);
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
String previousSequencePath = this.basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
try {
((BackgroundPathable)this.client.getData().usingWatcher(this.watcher)).forPath(previousSequencePath);
if (millisToWait == null) {
this.wait();
} else {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
this.wait(millisToWait);
} else {
doDelete = true;
break;
}
}
} catch (NoNodeException var19) {
}
}
}
}
} catch (Exception var21) {
ThreadUtils.checkInterrupted(var21);
doDelete = true;
throw var21;
} finally {
if (doDelete) {
this.deleteOurPath(ourPath);
}
}
return haveTheLock;
}
driver.getsTheLock
StandardLockInternalsDriver
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
static void validateOurIndex(String sequenceNodeName, int ourIndex) throws KeeperException {
if (ourIndex < 0) {
throw new NoNodeException("Sequential path not found: " + sequenceNodeName);
}
}
4.2 释放锁的逻辑
release
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);
} else {
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount <= 0) {
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);
} else {
try {
this.internals.releaseLock(lockData.lockPath);
} finally {
this.threadData.remove(currentThread);
}
}
}
}
}
releaseLock
final void releaseLock(String lockPath) throws Exception {
this.client.removeWatchers();
this.revocable.set((Object)null);
this.deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception {
try {
((ChildrenDeletable)this.client.delete().guaranteed()).forPath(ourPath);
} catch (NoNodeException var3) {
}
}
4.3 锁撤销
InterProcessMutex 支持一种协商撤销互斥锁的机制, 可以用于死锁的情况。
想要撤销一个互斥锁可以调用下面这个方法:
void makeRevocable(RevocationSpec entry) {
this.revocable.set(entry);
}
这个方法可以让锁持有者来处理撤销动作。 当其他进程/线程想要你释放锁时,就会回调参数中的监听器方法。 但是,此方法不是强制撤销的,是一种协商机制。
当想要去撤销/释放一个锁时,可以通过 Revoker 中的静态方法来发出请求:
Revoker.attemptRevoke();
public static void attemptRevoke(CuratorFramework client, String path) throws Exception {
try {
client.setData().forPath(path, LockInternals.REVOKE_MESSAGE);
} catch (NoNodeException var3) {
}
}
- path :加锁的zk节点path,通常可以通过
InterProcessMutex.getParticipantNodes() 获得。
这个方法会发出撤销某个锁的请求。如果锁的持有者注册了上述的 RevocationListener 监听器,那么就会调用监听器方法协商撤销锁。
5. 项目地址
zookeeper-demo
|