目录
一、背景
二、自定义实现
三、Redisson框架实现
一、背景
关于分布式锁就不多说了,现在出现了一种场景,如果在分布式锁中,业务代码没有执行完,然后锁的键值过期了,那么其他的JVM就可能会获取到锁而发生幂等的问题。那么这种情况怎么解决呢?
1、如果线程1获取到了锁,那么在业务代码还没执行完时,redis键值过期了,那么就会发生幂等问题。解决的思路是,当线程1获取到锁之后,开启一个线程去监听线程1是否执行完成,如果没有执行完成,就去延长键值的过期时间。
2、如果因为某些原因(比如代码bug,网络不稳定),锁一直不能释放,然后就要一直进行延长过期时间么?答案是否定的,我们可以设置延长时间的次数,如果超过设定的次数还是失败,就自动释放锁,然后回滚业务。
3、释放锁时需要考虑的问题,谁加的锁,就让谁来释放锁。
二、自定义实现
锁信息实体类
package com.xiaojie.lock;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xiaojie
* @version 1.0
* @description: 记录锁信息
* @date 2022/6/9 17:44
*/
public class RedisLockInfo {
/**
* 锁的状态 state为true 则表示获取锁成功
*/
private boolean state;
/**
* 锁的id
*/
private String lockId;
/**
* 锁的持有线程
*/
private Thread lockThread;
/**
* 锁的过期时间
*/
private Long expire;
/**
* 续命次数
*/
private AtomicInteger lifeCount;
/**
* 获取锁次数
*/
private AtomicInteger lockCount;
// 锁的可重入次数
public RedisLockInfo(String lockId, Thread lockThread, Long expire) {
state = true;
this.lockId = lockId;
this.lockThread = lockThread;
this.expire = expire;
lifeCount = new AtomicInteger(0);
lockCount = new AtomicInteger(0);
}
public RedisLockInfo(Thread lockThread, Long expire) {
this.lockThread = lockThread;
this.expire = expire;
lifeCount = new AtomicInteger(0);
lockCount = new AtomicInteger(0);
state = true;
}
public RedisLockInfo() {
state = true;
}
public String getLockId() {
return lockId;
}
public boolean isState() {
return state;
}
public Thread getLockThread() {
return lockThread;
}
public Long getExpire() {
return expire;
}
//重试的次数
public Integer getLifeCount() {
return lifeCount.incrementAndGet();
}
//锁获取的次数
public Integer incrLockCount() {
return lockCount.incrementAndGet();
}
//释放锁次数
public Integer decreLockCount() {
return lockCount.decrementAndGet();
}
}
获取锁的方法和续命方法
package com.xiaojie.lock.impl;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.lock.RedisLockInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author xiaojie
* @version 1.0
* @description: 实现分布式锁的实现类
* @date 2022/6/9 18:05
*/
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String redisLockKey = "redisLockKey";
/**
* 缓存redis锁
*/
private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
/**
* 重试时间
*/
private Long timeout = 3000L;
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@Override
public boolean tryLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
//判断是不是当前线程获取到了锁
if (null != redisLockInfo && redisLockInfo.isState()) {
//证明已经获取到了锁,锁重入
log.info(">>>>>>>>>>>>>>>>>已经获取到了锁");
redisLockInfo.incrLockCount(); //获取锁的次数加1
return true;
}
Long startTime = System.currentTimeMillis();
Long lockExpire = 30l; //键值的过期时间
//重试获取锁
for (; ; ) {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", lockExpire, TimeUnit.SECONDS);
if (lock) {
//获取锁成功
log.info(">>>>>>>>>>>>>>获取锁成功");
lockCacheMap.put(currentThread, new RedisLockInfo(currentThread, lockExpire));
return true;
}
// 控制一个超时的时间
Long endTime = System.currentTimeMillis();
if (endTime - startTime > timeout) {
log.info("在3秒内已经过了重试时间了>>>>>>>>>>>");
return false;
}
// 继续循环
try {
Thread.sleep(10); //休眠避免CPU飙高
} catch (Exception e) {
}
}
}
@Override
public boolean releaseLock() {
Thread currentThread = Thread.currentThread();
RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
if (null != redisLockInfo && redisLockInfo.isState()) {
if (redisLockInfo.decreLockCount() <= 0) {
lockCacheMap.remove(currentThread);
//删除键值
stringRedisTemplate.delete(redisLockKey);
return true;
}
}
return false;
}
public RedisLockImpl() {
//开始定时任务实现续命,每3秒续命一次
this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 3, TimeUnit.SECONDS);
}
//定义监听线程
class LifeExtensionThread implements Runnable {
@Override
public void run() {
log.info("开始续命线程>>>>>>>>>>>>>>>>>>>>>>>>>");
if (lockCacheMap.size() > 0) {
lockCacheMap.forEach((k, lockInfo) -> {
try {
Thread lockServiceThread = lockInfo.getLockThread();
if (lockServiceThread.isInterrupted()) {
//判断线程是否执行完毕
log.info("当前线程已经被停止>>>>>>>>>>>>>>>");
lockCacheMap.remove(k); //从缓存中移除线程
return;
}
// 续命键值过期时间
Integer lifeCount = lockInfo.getLifeCount();
if (lifeCount > 5) {
log.info(">>>>>>>>>>>>>>>>>您已经续命了多次当前线程还没有释放锁,现在主动将该锁释放 避免死锁的问题");
// 1.回滚业务(例如事务回滚)
// 2.释放锁
releaseLock();
// 3.将该线程主动停止
lockServiceThread.interrupt();
// 4.移除监听
lockCacheMap.remove(k);
return;
}
//提前实现续命 延长过期key的时间
stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
}
模拟效果使用 Jemeter模拟秒杀的业务
package com.xiaojie.controller;
import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xiaojie
* @version 1.0
* @description: 秒杀的接口
* @date 2022/6/9 22:55
*/
@RestController
public class SeckillController {
@Autowired
private GoodsMapper goodsMapper;
@Autowired
private RedisLock redisLock;
@GetMapping("/secKill")
public String secKill(Long id) {
try {
if (!redisLock.tryLock()) {
//获取锁失败
return "活动太火了,请稍后重试";
}
Goods goods = goodsMapper.selectById(id);
if (null == goods) {
return "该商品没有秒杀活动";
}
if (goods.getStock() <= 0) {
return "商品库存为空了。。。。。";
}
//减库存
Integer result = goodsMapper.deceGoods(id);
return result > 0 ? "秒杀成功" : "秒杀失败";
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
redisLock.releaseLock();
}
return "fail";
}
}
三、Redisson框架实现
图片来源? ?https://blog.csdn.net/menxu_work/article/details/123827526 ?
1、Redisson向redis写入键值的时候不再使用setNx的命令,而是使用Lua脚本的形式写入
2、写入键值成功之后会设置一个后台线程(看门狗线程)来对键值进行续命,默认值是每10秒续命一次,续命时间为30秒
3、键值的类型为hash类型,如下图,其中1是锁入的次数
?4、释放锁的时候,也是通过Lua脚本的方式,当锁入的次数小于0时,就会删除键值。
源码分析
加锁代码
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
//如果没有过期时间,证明键值不存在,直接返回
return;
}
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
//键值存在,自旋方式去获取锁
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//取消订阅
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + //判断键值是否存在,==0不存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //创建哈希类型键值,并设置值为1
"redis.call('pexpire', KEYS[1], ARGV[1]); " +//设置过期时间 ,默认是30s
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//键值存在
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//将键值的value值加1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置过期时间为30s
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);", //键值存在,并且不是重入锁,则返回键值剩余存活时间
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
释放锁的代码
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + //键值不存在则表示锁已经释放了
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //简直存在,则键值减1
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + //如果减1后键值还是存在,则重新设置过期时间
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " + //删除键值
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
续命的代码
?模拟秒杀代码实现
package com.xiaojie.redisson;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author xiaojie
* @version 1.0
* @description: reddisson配置
* @date 2022/6/10 2:05
*/
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
//设置看门狗时间
// config.setLockWatchdogTimeout(30000L);
//设置单机版本redis
config.useSingleServer().setAddress("redis://" + host + ":" + port);
//设置密码
config.useSingleServer().setPassword(password);
//设置集群的方式
// config.useClusterServers().addNodeAddress("redis://" + host + ":" + port);
// config.useClusterServers().addNodeAddress("redis://" + host2 + ":" + port2);
//添加主从配置
// config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
return Redisson.create(config);
}
}
秒杀的接口
package com.xiaojie.controller;
import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author xiaojie
* @version 1.0
* @description: 秒杀的接口
* @date 2022/6/9 22:55
*/
@RestController
public class RedissonSeckillController {
@Autowired
private GoodsMapper goodsMapper;
@Autowired
private RedissonClient redissonClient;
@GetMapping("/secKillRedisson")
public String secKillRedisson(Long id) {
RLock rLock = null;
try {
rLock = redissonClient.getLock(id + "");
rLock.lock(); //加锁,加几次锁,finally释放锁的时候就要释放几次锁
Goods goods = goodsMapper.selectById(id);
if (null == goods) {
return "该商品没有秒杀活动";
}
if (goods.getStock() <= 0) {
return "商品库存为空了。。。。。";
}
//减库存
Integer result = goodsMapper.deceGoods(id);
return result > 0 ? "秒杀成功" : "秒杀失败";
} catch (Exception e) {
e.printStackTrace();
} finally {
rLock.unlock();
}
return "fail";
}
}
|