一、什么是分布式锁
1、分布式锁的概念
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
- 多进程可见:多个JVM都能看得到。比如redis、mysql等,那么所有的JVM进程都能看得到
- 互斥锁:只允许一个进程能拿到锁
- 高可用:大多数情况下,获取锁都能获取成功
- 高性能:本身加锁后,线程变成了串行执行,从而会影响性能。所以获取锁的步骤上应该高性能
- 安全性:获取锁应该考虑异常的情况。获取锁后宕机怎么办? 死锁怎么办
2、分布式锁常见三种方式
分布式锁的核心是实现多进程之间互斥,而满足条件的并且常见的有三种:mysql、redis、zookeeper
- mysq: 数据库都具有事务的机制,在执行事务操作时,mysql会自动分配一个互斥锁。所以说在事务之间是互斥的,只有一个人能去执行。在业务执行前,去数据库中申请一个互斥锁,然后再去执行业务,当业务执行结束后,提交事务,锁也就释放了。当业务抛出了异常时,会自动触发回滚,锁也就释放了
- redis: 利用setnx互斥命令。 获取锁就在reids中setnx一条数据,如果没有该key,那么添加成功,随之获取锁成功,反之一样。当删除该key,就是释放锁成功。
二、基于redis实现分布式锁
1、Setnx的介绍
实现分布式锁时需要实现的两个基本方法:
获取锁:
127.0.0.1:6379[1]> setnx lock thread1 (integer) 1 # 现在成功获取锁了 127.0.0.1:6379[1]> setnx lock thread1 (integer) 0 # 重复获取锁,发现获取失败
释放锁:
127.0.0.1:6379[1]> del lock (integer) 1 # 删除key,就是手动释放锁 127.0.0.1:6379[1]> KEYS * (empty array) # 发现key,已经没有了
- 超时释放:在获取锁时加入过期时间。 可以避免服务宕机,然后死锁
127.0.0.1:6379[1]> set lock thread ex 8 nx OK 127.0.0.1:6379[1]> ttl lock (integer) 6 127.0.0.1:6379[1]> ttl lock (integer) 4 127.0.0.1:6379[1]> ttl lock (integer) 1 127.0.0.1:6379[1]> ttl lock (integer) -2
2、 实现redis分布式锁
- 添加释放锁需要判断是否是当前线程,避免锁误删操作。
- 添加LUA脚本解决多条命令原子性问题
1.定义接口,利用redis实现分布式锁功能
尝试获取锁:是因为采用的是非阻塞式。获取锁只是获取一次。要么成功要么失败。
public interface ILock {
boolean tryLock(long timeoutSec);
void unlock();
}
2. 实现接口,具体实现获取锁和释放锁
1.在获取锁时存入线程标识(可以用UUID表示)
2.在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致。从而避免误删别人的锁。
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
@Override
public boolean tryLock(long timeoutSec) {
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
}
3.释放锁的lua脚本
-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) == ARGV[1]) then
-- 释放锁 del key
return redis.call('del', KEYS[1])
end
return 0
三、基于Redisson实现分布式锁
1、Redisson的介绍
Redisson是开源的框架,在redis基础上实现的分布式工具的集合。而分布式锁只是Redisson的一个子集。
四、SpringBoot集成Redisson
1、配置环境
- 引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
- 配置Redisson客户端:
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.75.111:6379").setPassword("123321");
return Redisson.create(config);
}
}
为什么不使用yml文件和start呢? 添加配置可以使用yml文件,跟springBoot整合来实现,官网还提供了start。 因为会替代spring提供的redis的配置和实现。 建议使用Redisson时,自己进行配置bean,不和spring提供的redis配置进行掺和。
2、测试
@Resource
private RedissonClient redissonClient;
@Test
void testRedisson() throws InterruptedException {
RLock lock = redissonClient.getLock("anyLock");
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
if(isLock){
try {
System.out.println("执行业务");
}finally {
lock.unlock();
}
}
}
3、可重入锁的原理
1、什么是可重入锁?
可重入锁:
- 指的是同一个线程,可以多次获得一把锁。
- 利用Hash结构记录线程id和重试次数。
- 利用watchDog延续锁时间。
- 利用信号量控制锁重试等待。
缺点:redis宕机引起锁失效问题
例如方法A调用方法B,在方法A中先去获得锁,然后执行业务去调用B,而B又要获取同一把锁。 而例如set key value nx time 就是不可重入锁,就会出现死锁的状态。例如:如果A获得锁后,去执行B,B如果也想获得锁,但是A并没有释放锁,所以说就会出现死锁状态。
2、获取锁和释放锁
需要Hash类型
- key中记录锁的名称
- field记录线程标识
- value记录锁的重试次数。
获取锁和释放锁的流程:
- 创建锁的对象
- 在方法A中,获取锁,tryLock时记录锁的线程标识和重试次数为1
- 在方法B中,获取锁。如果是锁已经存在,并且是同一线程时,只需要在重试次数中加1。代表是第二次获取同一个锁。
- 在方法B或者方法A中,执行完业务,释放锁的逻辑是:需要把重试次数减1,并判断是否为0,如果为0则删除锁。
@SpringBootTest
class RedissonTest {
@Resource
private RedissonClient redissonClient;
private RLock lock;
@BeforeEach
void setUp() {
lock = redissonClient.getLock("order");
}
@Test
void method1() throws InterruptedException {
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
if (!isLock) {
log.error("获取锁失败 .... 1");
return;
}
try {
log.info("获取锁成功 .... 1");
method2();
log.info("开始执行业务 ... 1");
} finally {
log.warn("准备释放锁 .... 1");
lock.unlock();
}
}
void method2() {
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("获取锁失败 .... 2");
return;
}
try {
log.info("获取锁成功 .... 2");
log.info("开始执行业务 ... 2");
} finally {
log.warn("准备释放锁 .... 2");
lock.unlock();
}
}
}
4. 获取锁和释放锁的lua脚本
获取锁和释放锁一定要采用Lua脚本,来确保获取和释放锁的原子性。
获取锁:
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
if(redis.call('exists', key) == 0) then
redis.call('hset', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
if(redis.call('hexists', key, threadId) == 1) then
redis.call('hincrby', key, threadId, '1');
redis.call('expire', key, releaseTime);
return 1;
end;
return 0;
释放锁:
local key = KEYS[1];
local threadId = ARGV[1];
local releaseTime = ARGV[2];
if (redis.call('HEXISTS', key, threadId) == 0) then
return nil;
end;
local count = redis.call('HINCRBY', key, threadId, -1);
if (count > 0) then
redis.call('EXPIRE', key, releaseTime);
return nil;
else
redis.call('DEL', key);
return nil;
end;
4、 Redisson的锁重试和WatchDog机制
1、什么可重试问题
可重试:利用信号量和PubSub【发布订阅】功能实现等待、唤醒、获取锁失败的重试机制。
第一次尝试获取锁失败以后,并不是立即失败,而是利用了redis的PubSub的机制,做一个等待,等待释放锁的消息。 而获取锁成功的线程,在释放锁中会发送一条释放锁的消息。从而会被正在等待的线程通过订阅机制捕获到。 当等到释放锁的消息后,就会重试机制。
不可重试: 获取锁只尝试一次就返回false。
boolean isLock = lock.tryLock();
tryLock()的参数:
long waitTime:获取锁的最大等待时常。当第一次获取锁失败后,不会立即返回false,而是在规定的时间内进行重试,直到超时才会返回false。
long leaseTime:自动失效释放的时间
TimeUnit unit:时间单位
2、深入原码解释:
从获取锁这条命令开始往下执行: boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
return true;
} else {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
return false;
} else {
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
this.unsubscribe(subscribeFuture, threadId);
}
});
}
this.acquireFailed(waitTime, unit, threadId);
return false;
try {
time -= System.currentTimeMillis() - current;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
boolean var20 = false;
return var20;
} else {
boolean var16;
do {
long currentTime = System.currentTimeMillis();
ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
if (ttl == null) {
var16 = true;
return var16;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0L) {
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
currentTime = System.currentTimeMillis();
if (ttl >= 0L && ttl < time) {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
} while(time > 0L);
this.acquireFailed(waitTime, unit, threadId);
var16 = false;
return var16;
}
} finally {
this.unsubscribe(subscribeFuture, threadId);
}
}
}
}
}
2、WatchDog机制
超时释放: 锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。
超时续约: 利用watchDog看门狗机制,每隔一段时间(releseTime/3),重置超时时间
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1L) {
return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e == null) {
if (ttlRemaining == null) {
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
五、分布式锁主从一致性问题
1、什么是主从一致性问题
如果redis提供了主从集群,主从同步存在延迟,挡住宕机后,如果从并同步中的锁数据,则会出现锁实现。
主节点:负责增删改 从节点:只负责读的问题 那么主节点会把数据同步到从节点中,但是同步时会存在延迟,即使延迟很短也是会存在。当获取锁后,主从数据还没有来及同步时,主节点宕机了。主备切换后,在新的master节点中,发现锁并不存在了。
2、Redisson如何解决一致性问题【MultiLock联锁】
原理:多个独立的redis节点,必须在所有节点都获取重入锁,才算获取锁成功 优点:所有锁中最安全的实现方法 缺点:运维成本高、实现复杂
既然主从关系是导致一致性问题的原因,那么Redisson取消主从,那么所有的节点都是独立的redisson节点,相互之间没有任何关系,都可以做读写操作。那么获取锁时,依次在多个节点中进行获取锁操作。 可用性问题: 即使某一个节点宕机后,那么其他节点都有锁的信息。 更高的可用性: 在每一个节点后面加入slave节点,做主从同步。
即使加入了主从同步,也不会出现安全问题。 假设某一台master宕机后,刚好并没有完成数据同步。那么slave变成了master主节点。没有锁标识。 有一个线程趁虚而入,想要获取锁,并不能获取成功。因为只有在每一个节点都拿到锁才能获取成功。 只要任意一个节点存活中,其他线程就不能拿到锁,就不会出现锁失效的问题。
优点:保留了主从 机制,确保了整个redis的高可用特性,避免了主从一致引发的锁失效问题。
|