Redisson是一个 Redis的开源客户端,也提供了分布式锁的实现。
Redisson官网:
- https://github.com/redisson/redisson/
- https://redisson.org/
一、Redisson 分布式锁使用
Redisson分布式锁使用起来还是蛮简单的。
1、添加 Redisson 配置类 引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.5</version>
</dependency>
创建 Redisson 配置类,注入 RedissonClient客户端。
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
return Redisson.create(config);
}
}
2、使用 Redisson分布式锁
代码如下:
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private RedissonClient redissonClient;
public void disLockDemo(long productId) {
String lockKey = "DISTRIBUTE_LOCK:redissonLock:product_" + productId;
RLock redissonLock = redissonClient.getLock(lockKey);
redissonLock.lock();
try {
} finally {
redissonLock.unlock();
}
}
二、Redisson 分布式锁源码分析
使用分布式锁必须要考虑的一些问题:
- 互斥性:在任意时刻,只能有一个进程持有锁。
- 防死锁:即使有一个进程在持有锁的期间崩溃而未能主动释放锁,要有其他方式去释放锁从而保证其他进程能获取到锁。
- 不能释放别人的锁:加锁和解锁的必须是同一个进程。
- 锁的续期问题:业务执行时间超过锁的过期时间时,需要提前给锁的续期。
Redisson 是 Redis 官方推荐分布式锁实现方案,它采用 Watch Dog机制能够很好的解决锁续期的问题。 执行 lua脚本保证了多条命令执行的原子性操作。
带着上面分布式锁的一些问题查看源码。
1、获取分布式锁对象
简单了解一下。
1.1 创建 RedissonClient
我们在配置类中通过 Redisson.create(config) 方法创建了 RedissonClient对象,并注入到 IOC容器中。
1.2 获取分布式锁对象
使用 Redisson 客户端来 获取分布式锁对象。
2、加锁代码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl != null) {
RFuture<RedissonLockEntry> future = this.subscribe(threadId);
if (interruptibly) {
this.commandExecutor.syncSubscriptionInterrupted(future);
} else {
this.commandExecutor.syncSubscription(future);
}
try {
while(true) {
ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
if (ttl == null) {
return;
}
if (ttl >= 0L) {
try {
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException var13) {
if (interruptibly) {
throw var13;
}
((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else if (interruptibly) {
((RedissonLockEntry)future.getNow()).getLatch().acquire();
} else {
((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
}
}
} finally {
this.unsubscribe(future, threadId);
}
}
}
2.1 异步加锁机制
查看 tryAcquire() 加锁方法。 通过源码,看到加锁其实是通过一段 lua 脚本实现的,如下:
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end ;
return redis.call('pttl', KEYS[1]);
- KEYS[1] 代表的是你加锁的 key。
- ARGV[1] 代表的是锁 key 的默认生存时间,默认 30 秒。
- ARGV[2] 代表的是加锁的客户端的线程 ID。通过
getLockName 方法组装了一下。 - ARGV[2] 后面的 1:为了支持可重入锁做的计数统计。
Redisson 实现分布式锁的共享资源的存储结构是 hash数据结构:
key 是锁的名称,field 是客户端 ID,value 是该客户端加锁(可重入)的次数。
假设此时,客户端 1 来尝试加锁,查看加锁的 lua 脚本:
- 第一段 if 判断语句,如果你要加锁的那个锁 key 不存在的话,进行加锁。此时锁 key不存在,向Redis中设置一个 hash 结构的数据,则客户端 1加锁成功,返回 null。
2.1.1 锁的续期机制
客户端 1 加锁的那个锁 key 默认生存时间才 30 秒 ,如果超过了 30 秒,客户端 1 还想一直持有这把锁,就需要提前进行锁的续期操作。
Redisson 提供了一个 Watch dog 机制 来解决锁的续期问题, 只要客户端 1 一旦加锁成功,就会启动一个 Watch Dog。 进入 scheduleExpirationRenewal方法 ,重点查看 renewExpiration方法。 锁续期的 lua 脚本如下:
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('pexpire', KEYS[1], ARGV[1]);
return 1;
end ;
return 0;
从源码我们看到 leaseTime 必须是 -1 才会开启 Watch Dog 机制,我们发现:
- 如果想开启 Watch Dog 机制必须使用默认的加锁时间为 30s。
- 如果自己自定义时间,即使用 tryLock,锁并不会延长,不会触发Watch Dog 机制。
Watch Dog 机制其实就是一个后台定时任务线程,获取锁成功之后,会将持有锁的线程放入到一个 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 检查一下,如果客户端 1 还持有锁 key(判断客户端是否还持有 key,其实就是遍历 EXPIRATION_RENEWAL_MAP 里面线程 id 然后根据线程 id 去 Redis 中查,如果存在就会延长 key 的时间),那么就会不断的延长锁 key 的生存时间。
如果服务宕机了,Watch Dog 机制线程也就没有了,此时就不会延长 key 的过期时间,到了 30s 之后就会自动过期了,其他线程就可以获取到锁。
2.1.2 可重入加锁机制
Redisson 也是支持可重入锁的,比如:客户端 1 加锁代码:
@Override
public void demo() {
RLock lock = redissonSingle.getLock("myLock");
try {
lock.lock();
lock.lock();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
lock.unlock();
}
}
此时,如果客户端 1 又来尝试加锁,继续分析加锁的 lua 脚本。
- 首先,第一个 if 判断,你要加锁的那个锁 key 已经存在了。
- 然后,第二个 if 判断,判断一下,加锁的那个锁 key的 hash 数据结构中,是否包含客户端 1 的 ID,
- 此时数据结构的是客户端 1 的 ID,即包含客户端 1的 ID,然后就执行行可重入锁的命令,将 hash 结构的 value数据 + 1,返回 null。
2.2 锁互斥机制
上面客户端 1加锁成功,此时,如果客户端 2 来尝试加锁,继续分析加锁的 lua 脚本:
- 首先,第一个 if 判断,你要加锁的那个锁 key 已经存在了。
- 然后,第二个 if 判断,判断一下,加锁的那个锁 key的 hash 数据结构中,是否包含客户端 2 的 ID,如果包含就是执行可重入锁的赋值,此时 hash数据结构是客户端 1 的 ID,不包含客户端 2的 ID,所以,返回加锁的那个锁 key的剩余存活时间。
接着查看 lock方法中的 死循环部分。 流程大致如下:
- 尝试获取锁,返回 null 则说明加锁成功,返回一个ttl,则说明已经存在该锁,ttl 为锁的剩余存活时间。
- 如果此时客户端 2 进程获取锁失败,那么使用客户端 2 的线程 id,通过 Redis 的 channel 订阅锁释放的事件。
- 进入死循环中,尝试重新获取锁。
- 如果在重试中拿到了锁,则直接返回。
- 如果锁当前还是被占用的,那么等待释放锁的消息。通过使用了 JDK 的信号量 Semaphore 来阻塞线程,当 ttl 为锁的剩余存活时间为0后,信号量的 release() 方法会被调用,此时被信号量阻塞的等待队列中的一个线程就可以继续尝试获取锁了。
注意:
- 当锁正在被占用时,等待获取锁的进程并不是真正通过一个 while(true) 死循环去获取锁(占 CPU资源),而时使用 JDK 的信号量 Semaphore 来阻塞线程(间断性的不断尝试获取锁),是会释放 CPU资源的。
3、锁释放代码
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise();
RFuture<Boolean> future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
this.cancelExpirationRenewal(threadId);
if (e != null) {
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
result.trySuccess((Object)null);
}
});
return result;
}
3.1 异步释放锁机制
查看 unlockInnerAsync方法 。释放锁也是执行的 lua 脚本:
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end ;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end ;
return nil;
- 首先,第一段 if 判断语句,判断 key 是否存在的话,进行加锁。此时锁 key不存在,则客户端 1加锁成功,向Redis中设置一个 hash 结构的数据。返回 null。
- 然后,第二个 if 判断,判断一下该客户端对应的锁的 hash 结构的 value 值是否递减为 0,
- 如果递减不为 0,则重入锁的解锁,返回0。
- 如果递减为 0,则进行删除,返回1。
3.2 取消 Watch Dog机制
查看 cancelExpirationRenewal方法。 取消 Watch Dog 机制,即将 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的线程 id 删除。
3.3 通知阻塞等待的进程
利用 Redis 的发布订阅机制,广播释放锁的消息,通知阻塞等待的进程(向通道名为 redisson_lock__channel publish 一条 UNLOCK_MESSAGE 信息)。
– 求知若饥,虚心若愚。
|