title: Redis分布式锁实战 date: 2022-05-13 09:44:47 tags:
1.使用场景
想直接获取加锁 和解锁 代码,请直接到代码处
在下单场景减库存时我们一般会将库存查询出来,进行库存的扣除
@GetMapping(value = "order")
public R order() {
int stock = RedisUtil.getObject("stock", Integer.class);
if (stock > 0) {
RedisUtil.set("stock", --stock);
}
return R.ok(stock);
}
上述的操作看起来很正常,但是其实是有问题的,试想一下当我们有两个线程同时访问这个接口会发生什么
Thread-1 查询库存结果为100
Thread-2 也来查询库存,此时Thread-1还没有执行减少库存操作,Thread-2 查询库存的结果也是100
Thread-1 Set库存为99
Thread-2 Set库存为99
这样就出问题了,明天扣了两次库存,但是库存仅仅减了1次
使用Idea时,我们可以使在断点处右键 将Suspend调整为Thread ,仅阻断线程,并使用多个客户端同时请求接口,即可复现上述过程
2.加锁解决
- synchronized 我们可以用Java提供的
synchronized 关键字将方法 - 分布式锁,分布式锁的实现方案有很多种, zookeeper,redis,db,这边我们使用redis来实现以下分布式锁
3.分布式锁
上述两个线程同时进行的时候没有正确扣除库存正是因为【查询库存】和【扣除库存】不是一个原子操作,我们增加一个锁的机制,当线程持有锁的时候才允许进行【查询库存】和【扣除库存】,redis有一个sexNx 命令允许当指定的key不存在时才进行set操作,在java中为RedisTemplate的setIfAbsent方法,这个方法保证了同时只能有一个线程set成功,set成功时就表明我们拿到了锁,可以进行原子操作了,当我们执行完原子操作时我们也需要将锁释放掉,在redis实现中也就是将key删除,允许下一个线程set值,加锁和释放锁的代码如下
public static boolean lock(String key, String value) {
final boolean result = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(CacheConstant.LOCK_KEY + key, value));
if (result) {
log.info("[redisTemplate redis]设置锁缓存 缓存 url:{} ", key);
}
return result;
}
public static boolean unlock(String key) {
final boolean result = Boolean.TRUE.equals(redisTemplate.delete(CacheConstant.LOCK_KEY + key));
if (result) {
log.info("[redisTemplate redis]释放锁 缓存 url:{}", key);
}
return result;
}
那么我们将代码稍微修改一下,来利用锁来完成接口的改进
@GetMapping(value = "order")
public R order() {
boolean lock;
int stock;
try {
lock = RedisUtil.lock("stock", "");
if (!lock) {
return R.failed("服务繁忙,稍后再试");
}
stock = RedisUtil.getObject("stock", Integer.class);
if (stock > 0) {
RedisUtil.set("stock", --stock);
}
} finally {
RedisUtil.unlock("stock");
}
return R.ok(stock);
}
此时,我们再将断点放在获取库存之后,并先用一个终端请求接口
然后,我们再从终端2发起请求,可以看到我们终端1没有结束自己的原子操作时,终端2是无法进行库存的扣除的
4.增加失效时间
在上一步中,我们仿佛已经完成了需求,同时进行扣除库存的只有一个线程,但是试想一下,当线程获取到锁之后,服务突然宕机了,这时候就算及时重启机器,那么锁也一直得不到释放,那么扣除库存接口始终无法获取到锁,这肯定不是我们想要的效果,那么我们改进一下我们加锁的方法,增加一下失效时间,即使服务宕机了,我们重启机器之后,锁也能正常释放掉不会影响一下个线程获取到锁
public static boolean lock(String key, String value, long time) {
final boolean result = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(CacheConstant.LOCK_KEY + key, value, time, TimeUnit.SECONDS));
if (result) {
log.info("[redisTemplate redis]设置锁缓存 缓存 url:{} ========缓存时间为{}秒", key, time);
}
return result;
}
5.增加线程唯一值
还有一种情况会导致我们可能误删除别人的锁,比如当线程1执行完流程之后准备释放锁之时,这时候锁正好失效了,线程2此时获取到锁,线程1释放锁时并不知道锁失效了,那么线程1执行释放操作就会将线程2拥有的锁释放掉,这肯定是不对的,那么我们再对unlock方法改进一下
public static boolean unlock(String key, String value) {
if (Objects.equals(value, redisTemplate.opsForValue().get(CacheConstant.LOCK_KEY))) {
final boolean result = Boolean.TRUE.equals(redisTemplate.delete(CacheConstant.LOCK_KEY + key));
if (result) {
log.info("[redisTemplate redis]释放锁 缓存 url:{}", key);
}
return result;
}
return false;
}
@GetMapping(value = "order")
public R order() {
boolean lock;
int stock;
String uuid = IdUtil.fastUUID();
try {
lock = RedisUtil.lock("stock", uuid, 60L);
if (!lock) {
return R.failed("服务繁忙,稍后再试");
}
stock = RedisUtil.getObject("stock", Integer.class);
if (stock > 0) {
RedisUtil.set("stock", --stock);
}
} finally {
RedisUtil.unlock("stock", uuid);
}
return R.ok(stock);
}
6.Lua脚本
上面我们说了为了防止误删别人的锁,我们需要在删除锁时判断一下锁是否为自己持有,那么问题来了,我们这个查询锁值和删除锁的操作也并不是一个原子操作,也就是说可能你在获取锁值时锁还为自己持有,但是执行删除时锁已经不为自己持有了,还是会可能误删别人的锁,想要保证释放锁的原子性,我们可以通过redis原生支持的lua脚本来实现
public static boolean unlock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(CacheConstant.LOCK_KEY + key), value);
if (Objects.equals(1L, result)) {
log.info("[redisTemplate redis]释放锁 缓存 url:{}", key);
return true;
}
return false;
}
7.Lua是如何实现原子性的
可以看到Lua脚本的大致意思也是跟我们自己写的代码差不多,判断是否为自己持有如果是才进行删除,那为什么Lua脚本可以保证原子性呢
Redis使用同一个Lua解释器来执行所有命令,同时,Redis保证以一种原子性的方式来执行脚本:当lua脚本在执行的时候,不会有其他脚本和命令同时执行,这种语义类似于 MULTI/EXEC。从别的客户端的视角来看,一个lua脚本要么不可见,要么已经执行完。
然而这也意味着,执行一个较慢的lua脚本是不建议的,由于脚本的开销非常低,构造一个快速执行的脚本并非难事。但是你要注意到,当你正在执行一个比较慢的脚本时,所以其他的客户端都无法执行命令。
8.代码演示
代码演示
public static boolean lock(String key, String value, long time) {
final boolean result = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(CacheConstant.LOCK_KEY + key, value, time, TimeUnit.SECONDS));
if (result) {
log.info("[redisTemplate redis]设置锁缓存 缓存 url:{} ========缓存时间为{}秒", key, time);
}
return result;
}
public static boolean unlock(String key, String value) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
Long result = redisTemplate.execute(redisScript, Collections.singletonList(CacheConstant.LOCK_KEY + key), value);
if (Objects.equals(1L, result)) {
log.info("[redisTemplate redis]释放锁 缓存 url:{}", key);
return true;
}
return false;
}
@GetMapping(value = "order")
public R order() {
boolean lock;
int stock;
String uuid = IdUtil.fastUUID();
try {
lock = RedisUtil.lock("stock", uuid,6000L);
if (!lock) {
return R.failed("服务繁忙,稍后再试");
}
stock = RedisUtil.getObject("stock", Integer.class);
if (stock > 0) {
RedisUtil.set("stock", --stock);
}
} finally {
RedisUtil.unlock("stock", uuid);
}
return R.ok(stock);
}
9. 总结
分布式锁在使用的过程中还是有挺多的讲究的,主要看应用场景例如还需要保证上述流程中可能碰到的锁失效时间小于代码执行时间,锁提前失效的问题,锁如何保证重入性的问题,欢迎大家讨论
|