为什么要使用分布式锁
如果在一个分布式系统中,我们从数据库中读取一个数据,然后修改保存,这种情况很容易遇到并发问题。因为读取和更新保存不是一个原子操作,在并发时就会导致数据的不正确。这种场景其实并不少见,比如电商秒杀活动,库存数量的更新就会遇到。如果是单机应用,直接使用本地锁就可以避免。如果是分布式应用,本地锁派不上用场,这时就需要引入分布式锁来解决。
由此可见分布式锁的目的其实很简单,就是为了保证多台服务器在执行某一段代码时保证只有一台服务器执行。
满足分布式锁的必要条件
- 互斥性。在任何时刻,保证只有一个客户端持有锁。
- 不能出现死锁。如果在一个客户端持有锁的期间,这个客户端崩溃了,也要保证后续的其他客户端可以上锁。
- 保证上锁和解锁都是同一个客户端。
实现分布式锁的几种方式
- 使用MySQL,基于唯一索引。
- 使用ZooKeeper,基于临时有序节点。
- 使用Redis,基于set命令(2.6.12 版本开始)。
Redis实现分布式锁的命令
SET key value [EX seconds] [PX milliseconds] [NX|XX]
可选参数 从 Redis 2.6.12 版本开始, SET 命令的行为可以通过一系列参数来修改:
EX second :设置键的过期时间为 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。
PX millisecond :设置键的过期时间为 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。
NX :只在键不存在时,才对键进行设置操作。 SET key value NX 效果等同于 SETNX key value 。
XX :只在键已经存在时,才对键进行设置操作。
加锁:使用SET key value [PX milliseconds] [NX] 命令,如果key不存在,设置value,并设置过期时间(加锁成功)。如果已经存在lock(也就是有客户端持有锁了),则设置失败(加锁失败)。
解锁:使用del命令,通过删除键值释放锁。释放锁之后,其他客户端可以通过set命令进行加锁。
分布式锁考虑的问题
互斥性。在任何时刻,保证只有一个客户端持有锁
redis命令是原子性的,只要客户端调用redis的命令SET key value [PX milliseconds] [NX] 执行成功,就算加锁成功了
不能出现死锁。如果在一个客户端持有锁的期间,这个客户端崩溃了,也要保证后续的其他客户端可以上锁。
set命令px设置了过期时间,key过期失效了,就能避免死锁了
保证上锁和解锁都是同一个客户端。
释放锁(删除key)的时候,只要确保是当前客户端设置的value才去删除key即可,采用lua脚本来实现
在Redis中,执行Lua语言是原子性,也就是说Redis执行Lua的时候是不会被中断的,具备原子性,这个特性有助于Redis对并发数据一致性的支持。
java代码实现Redis分布式锁
先把需要的jar包引入
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.3</version>
</dependency>
加锁设置参数的实体类
package com.ahysf.common.entity;
import lombok.Data;
@Data
public class LockParam {
private String lockKey;
private Long tryLockTime;
private Long holdLockTime;
public LockParam(String lockKey) {
this(lockKey, 1000 * 3L, 1000 * 5L);
}
public LockParam(String lockKey, Long tryLockTime) {
this(lockKey, tryLockTime, 1000 * 5L);
}
public LockParam(String lockKey, Long tryLockTime, Long holdLockTime) {
this.lockKey = lockKey;
this.tryLockTime = tryLockTime;
this.holdLockTime = holdLockTime;
}
}
Redis分布式具体代码实现
package com.ahysf.common.utils;
import com.ahysf.common.entity.LockParam;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.UUID;
@Slf4j
public class RedisLock {
private final static String prefix_key = "redisLock:";
private final static String unLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
private final static Long unLockSuccess = 1L;
private LockParam lockParam;
private Long tryLockEndTime;
private String redisLockKey;
private String redisLockValue;
private Boolean holdLockSuccess = Boolean.FALSE;
private Jedis jedis;
private Jedis getJedis() {
return this.jedis;
}
private void closeJedis(Jedis jedis) {
jedis.close();
jedis = null;
}
public RedisLock(LockParam lockParam) {
if (lockParam == null) {
new RuntimeException("lockParam is null");
}
if (lockParam.getLockKey() == null || lockParam.getLockKey().trim().length() == 0) {
new RuntimeException("lockParam lockKey is error");
}
this.lockParam = lockParam;
this.tryLockEndTime = lockParam.getTryLockTime() + System.currentTimeMillis();
this.redisLockKey = prefix_key.concat(lockParam.getLockKey());
this.redisLockValue = UUID.randomUUID().toString().replaceAll("-", "");
jedis = new Jedis("127.0.0.1", 6379);
}
public boolean lock() {
while (true) {
if (System.currentTimeMillis() > tryLockEndTime) {
return false;
}
holdLockSuccess = tryLock();
if (Boolean.TRUE.equals(holdLockSuccess)) {
return true;
}
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private boolean tryLock() {
try {
String result = getJedis().set(redisLockKey, redisLockValue, "NX", "PX", lockParam.getHoldLockTime());
if ("OK".equals(result)) {
return true;
}
} catch (Exception e) {
log.warn("tryLock failure redisLockKey:{} redisLockValue:{} lockParam:{}", redisLockKey, redisLockValue, lockParam, e);
}
return false;
}
public Boolean unlock() {
Object result = null;
try {
if (Boolean.TRUE.equals(holdLockSuccess)) {
result = getJedis().eval(unLockScript, Collections.singletonList(redisLockKey), Collections.singletonList(redisLockValue));
if (unLockSuccess.equals(result)) {
return true;
}
}
} catch (Exception e) {
log.warn("unlock failure redisLockKey:{} redisLockValue:{} lockParam:{} result:{}", redisLockKey, redisLockValue, lockParam, result, e);
} finally {
this.closeJedis(jedis);
}
return false;
}
}
Redis分布式锁使用
package com.ahysf.common.utils;
import com.ahysf.common.entity.LockParam;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestRedisLock {
static String lockKey = "666";
public static void main(String[] args) throws InterruptedException {
log.info("下面测试两个线程同时,抢占锁的结果");
Thread thread1 = new Thread(() -> {
testRedisLock();
});
thread1.setName("我是线程1");
Thread thread2 = new Thread(() -> {
testRedisLock();
});
thread2.setName("我是线程2");
thread1.start();
thread2.start();
Thread.sleep(1000 * 20);
log.info("-----------------我是一条分割线----------------");
log.info("");
log.info("");
log.info("");
log.info("下面是测试 一个线程获取锁成功后,由于业务执行时间超过了设置持有锁的时间,是否会把其他线程持有的锁给释放掉");
Thread thread3 = new Thread(() -> {
testRedisLock2();
});
thread3.setName("我是线程3");
thread3.start();
Thread.sleep(1000 * 1);
Thread thread4 = new Thread(() -> {
testRedisLock();
});
thread4.setName("我是线程4");
thread4.start();
}
public static void testRedisLock() {
LockParam lockParam = new LockParam(lockKey);
lockParam.setTryLockTime(2000L);
lockParam.setHoldLockTime(1000 * 10L);
RedisLock redisLock = new RedisLock(lockParam);
try {
Boolean lockFlag = redisLock.lock();
log.info("加锁结果:{}", lockFlag);
if (lockFlag) {
try {
Thread.sleep(1000 * 5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
log.info("testRedisLock e---->", e);
} finally {
boolean unlockResp = redisLock.unlock();
log.info("释放锁结果:{}", unlockResp);
}
}
public static void testRedisLock2() {
LockParam lockParam = new LockParam(lockKey);
lockParam.setTryLockTime(1000 * 2L);
lockParam.setHoldLockTime(1000 * 2L);
RedisLock redisLock = new RedisLock(lockParam);
try {
Boolean lockFlag = redisLock.lock();
log.info("加锁结果:{}", lockFlag);
if (lockFlag) {
try {
Thread.sleep(1000 * 10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
log.info("testRedisLock e---->", e);
} finally {
boolean unlockResp = redisLock.unlock();
log.info("释放锁结果:{}", unlockResp);
}
}
}
|