Redis高并发分布锁实战
问题场景
场景一: 没有捕获异常
Boolean ret = stringRedisTemplate.opsForValue().setIfAbsent("lock_key", "1");
stringRedisTemplate.delete("lock_key");
问题: 以上场景在代码出现异常的时候,会出现死锁,导致后面的线程无法获取锁,会阻塞所有线程
场景二: 线程间交互删除锁
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key", "1", 10, TimeUnit.SECONDS);
stringRedisTemplate.delete(key);
问题: 相对于场景一多了锁的过期时间 。假如线程A执行业务代码的时间是15s,而锁的时间是10s,那么锁过期后自动会被删除,此时线程B获取锁,执行业务代码时间为8s,而这个时候线程A刚好执行完业务代码了,就会出现线程A把线程B的锁删除掉
String uuid = UUID.getUuid;
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent("key", uuid, 10, TimeUnit.SECONDS);
if (uuid.equals(stringRedisTemplate.opsForValue().get(key)) {
stringRedisTemplate.delete(key);
}
问题: 当线程A持有锁,执行完扣减库存后,假设锁过期时间是10s,恰好此时在执行9.99s的时候出现卡顿 ,等服务器反应过来之间,锁过期自动删除了,这个时候线程B获取锁,然后执行业务代码,此时线程A刚好反应过来,执行锁删除 ,这样就会把线程B的锁删除,要知道此时线程B是没有执行完业务代码的,锁删除后,线程C又获取锁,此时线程B执行完,又会把线程C的锁删除,依次类推
解决方案
方案: 使用Redisson分布式锁
@Autowire
public Redisson redisson;
public void stock () {
String key = "key";
RLock lock = redisson.getLock(key);
try {
lock.lock();
} catch(Exception e) {
lock.unlock();
}
}
优点: 自带锁续命 功能,默认30s过期时间,可以自行调整过期时间
LUA脚本模拟商品减库存
jedis.set("product_stock_10016", "15");
String script = " local count = redis.call('get', KEYS[1]) " +
" local a = tonumber(count) " +
" local b = tonumber(ARGV[1]) " +
" if a >= b then " +
" redis.call('set', KEYS[1], a-b) " +
" bb == 0 " +
" return 1 " +
" end " +
" return 0 ";
Object obj = jedis.eval(script, Arrays.asList("product_stock_10016"), Arrays.asList("10"));
System.out.println(obj);
总结
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
this.commandExecutor.syncSubscription(future);
try {
while(true) {
ttl = this.tryAcquire(leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
this.getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
this.getEntry(threadId).getLatch().acquire();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
Redisson分布式锁流程
- 高并发下Lua脚本保证了原子性
- Schedule定期锁续命
- 未获取锁的线程先Subscribe channel
- 自旋,再次尝试获取锁
- 如果还是未获取锁,则通过Semaphore->tryAcquire(ttl.TimeUnit)阻塞所有进入自旋代码块的线程。(
这样做的目的是为了不让其他线程因为不停的自旋而给服务器造成压力,所以让其他线程先阻塞一段时间,等阻塞时间结束,再次自旋 ) - 获取锁的线程解锁后,使用Redis的发布功能进行发布消息,订阅消息的线程调用release方法释放阻塞的线程,再次尝试获取锁
- 如果是调用Redisson的tryAcquire(1000,TimeUnit.SECONDS)方法,那么未获取到锁的线程不用进行自旋,因为时间一到,未获取到锁的线程就会自动往下走进入业务代码块
|