1、分布式锁
在应用开发中,特别是web工程开发,通常都是并发编程,不是多进程就是多线程。这种场景下极易出现线程并发性安全问题,此时不得不使用锁来解决问题。在多线程高并发场景下,为了保证资源的线程安全问题,jdk为我们提供了synchronized关键字和ReentrantLock可重入锁,但是它们只能保证一个工程内的线程安全。在分布式集群、微服务、云原生横行的当下,如何保证不同进程、不同服务、不同机器的线程安全问题,jdk并没有给我们提供既有的解决方案。此时,我们就必须借助于相关技术手动实现了。目前主流的实现有以下方式:
- 基于mysql关系型实现
- 基于redis非关系型数据实现
- 基于zookeeper/etcd实现
2、传统锁
2.1、经典问题——卖票
多线程并发安全问题最典型的代表就是超卖现象。库存在并发量较大情况下很容易发生超卖现象,一旦发生超卖现象,就会出现多成交了订单而发不了货的情况。
场景:
车票余量为5时,用户A和B同时来购买,此时查询余数都为5,余票充足则开始卖票:
用户A:update db_stock set stock = stock - 1 where id = 1
用户B:update db_stock set stock = stock - 1 where id = 1
并发情况下,更新后的结果可能是4,而实际的最终余票应该是3才对
2.2、并发导致超卖现象
首先我们构建好一个springboot程序,连接好数据库,新增建表语句,如下
CREATE TABLE `db_stock` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
`stock_code` varchar(255) DEFAULT NULL COMMENT '仓库编号',
`count` int(11) DEFAULT NULL COMMENT '库存量',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
而且我们直接实例化一个对象,并且给上初始值5000,我们单个直接调用模拟卖票操作。
public synchronized void reduce() {
stock.setStocks(stock.getStocks() - 1);
System.out.println("剩余 =====>>>>> " + stock.getStocks());
}
而之后我们直接使用JMeter压力测试工具来进行模拟多线程操作,直接执行5000次查看最后操作之后的结果
 可以看到我们直接并发5000个请求,到最后并不会将余数5000减为0,这就是并发导致的数据问题,
2.3、JVM锁
上面2.2存在的并发问题这该如何进行解决呢?我们只需要给这一段代码上锁即可,有以下两种方式,分别是直接加上synchronized 将方法变为原子操作,以及通过ReentrantLock给代码上锁。
public synchronized void reduce() {
stock.setStocks(stock.getStocks() - 1);
System.out.println("剩余 =====>>>>> " + stock.getStocks());
}
private ReentrantLock lock = new ReentrantLock();
public void reduceLock() {
lock.lock();
try {
stock.setStocks(stock.getStocks() - 1);
System.out.println("剩余 =====>>>>> " + stock.getStocks());
} finally {
lock.unlock();
}
}
JVM锁原理: 添加synchronized关键字之后,StockService就具备了对象锁,由于添加了独占的排他锁,同一时刻只有一个请求能够获取到锁,并减库存。此时,所有请求只会one-by-one执行下去,也就不会发生超卖现象。
2.4、事务与JVM锁
开启事务也会导致锁失效,当每一次操作进入代码之后,由于开启了事务,当有一个请求获取锁之后,其余请求都会进行阻塞住,之后当请求操作完成之后释放锁。但是这个时候还没有提交事务,另外一个请求就已经重新获取锁了,这是这个请求获取的值还是没有进行改变的值。这样就会导致锁失效
@Transactional
public synchronized void reduceByMysqlAndTransactional() {
StockMysql stockMysql = stockMysqlMapper.selectOne(new QueryWrapper<StockMysql>().eq("product_code", "DN001"));
if (stockMysql.getCount() > 0) {
stockMysql.setCount(stockMysql.getCount() - 1);
System.out.println("剩余 ====>>>>>> " + stockMysql.getCount());
stockMysqlMapper.updateById(stockMysql);
}
}
如何解决事务导致锁失效的问题:我们只需要指定事务的方式
@Transactional(isolation = Isolation.READ_UNCOMMITTED)
2.5、MySql锁
除了使用jvm锁之外,还可以使用数据锁:悲观锁 或者 乐观锁
-
一个sql:直接更新时判断,在更新中判断库存是否大于0 update table set surplus = (surplus - buyQuantity) where id = 1 and (surplus - buyQuantity) > 0 ; -
悲观锁:在读取数据时锁住那几行,其他对这几行的更新需要等到悲观锁结束时才能继续 。 select … for update -
乐观锁:读取数据时不锁,更新时检查是否数据已经被更新过,如果是则取消当前更新进行重试。 version 或者 时间戳(CAS思想)。
2.5.1、一个SQL
在这里部分场景我们可以使用一个SQL解决数据并发问题,但是很多业务场景并不能直接通过一条sql来进行实现
@Update("update t_stock set count = count - #{reduce} where product_code = #{productCode} and count >=1")
int updateByLock(@Param("reduce") Integer reduce, @Param("productCode") String productCode);
2.5.2、悲观锁
2.5.2.1、表级锁
在mysql当中我们开启事务,进行更新,这时我们的表是一个最简单表,只有一个主键,其他的都是普通字段,在一个终端开启事务更新数据,但是这时我们不提交也不回滚不进行任何操作,并且同时新开一个终端也来进行更新操作,这时到这里更新会被锁住,不会直接更新,当我们将事务结束之后,这个更新才会执行,可以看到这一条更新语句是十几秒之后才被执行的,由此我们可以得到一个结论,该事务锁住了整张表。
 2.5.2.2、行级锁
而当我们给表增加索引之后,并且来根据这个索引来进行查询的时候,这个时候就不会将整个表给锁住,只会锁住对应的行
表结构:
CREATE TABLE `t_stock` (
`id` int(16) unsigned NOT NULL AUTO_INCREMENT,
`product_code` varchar(16) NOT NULL,
`wear_house` varchar(16) NOT NULL,
`count` int(16) NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `pcode` (`product_code`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8
只会锁住对应的行,这时更新另外的数据可以直接更新。

select * from t_stock where id =1 for update;
select * from t_stock where id =2;
2.5.3、乐观锁
乐观锁( Optimistic Locking ) 相对悲观锁而言,乐观锁假设认为数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则重试。那么我们如何实现乐观锁呢
使用数据版本(Version)记录机制实现,这是乐观锁最常用的实现 方式。一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加一。当我们提交更新的时候,判断数据库表对应记录 的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新。
给表添加version字段(用来记录和控制版本):
ALTER TABLE `t_stock` ADD VERSION INT(32) NOT NULL
添加代码实现:
public void reduceByMysqlLockOptimistic() throws InterruptedException {
StockMysql stock = stockMysqlMapper.selectOne(new QueryWrapper<StockMysql>().eq("product_code", "DN001"));
Integer version = stock.getVersion();
stock.setCount(stock.getCount() - 1);
stock.setVersion(stock.getVersion() + 1);
if(stockMysqlMapper.update(stock,new UpdateWrapper<StockMysql>().eq("id",stock.getId()).eq("version",version)) == 0){
Thread.sleep(20);
this.reduceByMysqlLockOptimistic();
}
}
在这里我们使用JMeter来进行压力测试,并且我们分别并发500次和5000次来进行控制变量测试吞吐量。
  说明乐观锁在并发量越大的情况下,性能越低(因为需要大量的重试);并发量越小,性能越高。
乐观锁存在的问题: 当mysql服务是部署的主从复制(读写分离)这个时候读和写的分开进行操作的,而在进行mysql数据同步的时候也需要时间,这个时候我们每次读取的数据不是最新的概率就会更大,就会导致乐观锁重试的次数增多,cpu占用率飙升
2.6、Redis锁
redis分布式缓存 参考这篇文章,这里不做过多阐述了。单击前往
在redis当中我们添加一个key,之后我们通过JMeter进行压力测试,压测之后,我们再看redis当中的stock的值,可以看到在进行并发之后还是会存在数据问题。
public void redisStock(){
String stock = stringRedisTemplate.opsForValue().get("stock");
Integer integer = Integer.valueOf(stock);
if(stock != null && integer > 0){
stringRedisTemplate.opsForValue().set("stock",String.valueOf(
}
}
还是一样我们使用JMeter来进行压力测试,并发2000个请求之后来看一下redis当中的stock数据是不是预想的减了2000。很显然不会,并发修改还是会存在问题,那如何解决呢?再后文会提供解决方案。

2.6.1、解决方案1 - JVM本地锁
这里直接使用synchronized关键字和ReentrantLock上锁即可。
2.6.2、解决方案2 - 乐观锁
首先我们熟悉一下Redis的事务:
在Redis中开启事务的命令是 multi 命令, 而执行事务的命令是 exec 命令。multi 到 exec 命令之间的 Redis 命令将采取进入队列的形式,直至 exec 命令的出现,才会一次性发送队列里的命令去执行,而在执行这些命令的时候其他客户端就不能再插入任何命令了。如果回滚事务,可以使用 discard 命令取消事务中所有命令,使事务中的方法不会被执行了。
代码实现:
public void redisStockOptimistic() {
stringRedisTemplate.execute(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations redisOperations) throws DataAccessException {
redisOperations.watch("stock");
String stock = (String) redisOperations.opsForValue().get("stock");
Integer integer = Integer.valueOf(stock);
if (stock != null && integer > 0) {
redisOperations.multi();
redisOperations.opsForValue().set("stock", String.valueOf(
List exec = redisOperations.exec();
if (exec == null || exec.size() == 0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
redisStockOptimistic();
}
return exec;
}
return null;
}
});
}
在开启Redis的事务之后我们同样的来进行2000条并发测试,注意对应的吞吐量以及最后的stock数据是否和预期的一样。可以看到吞吐量很明显的下降了,这是因为多并发会导致事务执行时,其余操作都会阻塞。
 使用乐观锁之后普遍会存在的问题,和mysql乐观锁一样当并发量上去之后,重试的次数会越来越多导致性能问题
3、Redis分布式锁
3.1、实现一个redis分布式锁
借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。
public void redisDistributedLock() {
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1");
if (!lock) {
try {
Thread.sleep(100);
redisDistributedLock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
try {
String stock = stringRedisTemplate.opsForValue().get("stock");
Integer integer = Integer.valueOf(stock);
if (stock != null && integer > 0) {
stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));
}
} finally {
stringRedisTemplate.delete("lock");
}
}
}
3.2、递归优化
在进行递归的时候,每次都会往栈里面加一个方法,并发过大的时候可能会导致栈内存溢出,我们可以通过while循环来进行优化
while(lock){
try {
String stock = stringRedisTemplate.opsForValue().get("stock");
Integer integer = Integer.valueOf(stock);
if (stock != null && integer > 0) {
stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));
}
} finally {
stringRedisTemplate.delete("lock");
}
}
3.3、防止死锁
首先我们了解一下什么是死锁。setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法锁无法释放(死锁)。那如何解决死锁问题呢?只需要给锁设置过期时间,自动释放锁即可。
首先我们熟悉一下在redis当中设置锁的超时时间的命令
set lock 1
expire lock 20
set lock 1 ex 3 nx
ttl lock
然后既然我们知道了,那就好办了,我们直接在代码当中加锁的超时,我们直接在这里加上一个超时的设置。
 但是我们代码直接加在这里还是可以继续优化的,我们不能保证加锁和设置锁的超时时间的原子性,所以我们可以直接在创建锁的时候一起设置超时时间
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "1", 3, TimeUnit.SECONDS);
3.4、防误删
这一个存在的问题就是,当我们第一次请求进来之后设置一个lock锁,此时锁的销毁时间是3秒,但是后续的业务处理却花费了5秒,这时在3秒的时候就释放了锁,这时第一个请求的业务还没执行完,第二个请求也获取锁来开始执行业务了,就会导致业务混乱
解决方案:给每一个锁设置一个uuid来进行标识,释放锁时只释放自己的锁
public void redisLockUUID() {
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
while (!lock) {
try {
Thread.sleep(100);
redisLockUUID();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
} finally {
if (StringUtils.equals(stringRedisTemplate.opsForValue().get("lock"), uuid)) {
stringRedisTemplate.delete("lock");
}
}
}
3.5、Redis当中的LUA脚本
redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。如果redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis 也保证脚本会以原子性(atomic)的方式执行: 当某个脚本正在运行的时候,不会有其他脚本或 Redis 命令被执行。 这和使用 MULTI/ EXEC 包围的事务很类似。
public void redisLua() {
String uuid = UUID.randomUUID().toString();
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, TimeUnit.SECONDS);
while (!lock) {
try {
Thread.sleep(100);
redisLua();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
try {
String stock = stringRedisTemplate.opsForValue().get("stock");
Integer integer = Integer.valueOf(stock);
if (stock != null && integer > 0) {
stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));
}
} finally {
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"), uuid);
}
}
3.6、可重入锁
由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。
用以下代码来进行简单说明:首先代码进来之后执行a方法,a方法直接加上一个锁,此时加锁之后再执行b方法,而b方法这里又同样的需要设置一个同名锁a,这个时候锁是不可重入的,b永远无法获取锁,这就很奇怪,我要自己释放我自己?
public void a(){
b();
}
public void b(){
}
3.6.1、加锁
使用lua脚本来进行实现可重入锁,这里用到的是redis当中的set数据类型,先使用exists用来判断锁是否存在,之后通过hset命令往hash里面添加键值数据,并且同时通过expire设置(更新)过期时间,之后再加判断,使用hsxists判断锁的hash是否存在,存在即重入锁,并且更新过期时间
if redis.call('exists','lock') == 0
then
redis.call('hset','lock',uuid,1)
redis.call('expire','lock',30)
return 1
elseif redis.call('hsxists','lock',uuid) == 1
then
redis.call('hsxists','expirelock',uuid,1)
redis.call('expire','lock',30)
return 1
else
return 0
end
将这一段脚本的判断优化一下,并且将对应的值更换成keys和argv用来后续动态传递,脚本就变成了这样
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
then
redis.call('hincrby', KEYS[1], ARGV[1], 1);
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1。
3.6.2、解锁
解锁操作,首先通过uuid来判断该锁是否存在,不存在直接返回-1用来标识解锁失败,而该锁存在的只需对这个锁-1,减到0的时候将锁给释放掉。
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then
return -1;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then
return 0;
else
redis.call('del', KEYS[1]);
return 1;
end;
3.6.3、代码实现
首先我们定义一个工具类用来对redis分布式锁进行加锁解锁统一管理。首先继承Lock接口实现对应方法,在构造方法当中需要由外部传入对应的三个参数,分别为操作redis、锁名称、uuid。在这里主要方法为tryLock(time,unit)在该方法中首先对入参时间进行判断,之后直接通过lua脚本来进行做加锁操作,在unlock解锁操作也如此。
public class DistributedRedisLock implements Lock {
private StringRedisTemplate stringRedisTemplate;
private String lockName;
private String uuid;
private long expire = 30;
DistributedRedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuid = uuid;
}
@Override
public void lock() {
this.tryLock();
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
try {
return tryLock(-1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
if (time != -1) {
this.expire = unit.toSeconds(time);
}
String script = "if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1) thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;else return 0; end";
while (!stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {
Thread.sleep(100);
}
return true;
}
@Override
public void unlock() {
String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return -1; elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0; else redis.call('del', KEYS[1]); return 1 end";
Long execute = stringRedisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);
if (execute == -1) {
throw new IllegalMonitorStateException("恶意释放锁");
}
}
@Override
public Condition newCondition() {
return null;
}
public String getUuid() {
return uuid + Thread.currentThread().getId();
}
}
并且我们加入一个统一管理redisLock的公共类来进行管理,后续使用只需要注入该类即可
@Component
public class DistributedLockClient {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private String uuid;
public DistributedLockClient() {
this.uuid = UUID.randomUUID().toString();
}
public DistributedRedisLock getRedisLock(String lockName) {
return new DistributedRedisLock(stringRedisTemplate, lockName, uuid);
}
}
最后直接注入该类进行加锁解锁操作
public void reentrantLock() {
DistributedRedisLock lock = distributedLockClient.getRedisLock("stock_lock");
lock.tryLock();
try {
String stock = stringRedisTemplate.opsForValue().get("stock").toString();
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
stringRedisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
} finally {
lock.unlock();
}
}
3.7、自动续期
lua脚本
if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then
redis.call('expire', KEYS[1], ARGV[2]);
return 1;
else
return 0;
end
新增自动续期代码:
public void renewExpire() {
String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +
"then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";
new Timer().schedule(new TimerTask() {
@Override
public void run() {
if (stringRedisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getUuid(), String.valueOf(expire))) {
renewExpire();
}
}
}, this.expire * 1000 / 3);
}
并且再设置uuid的时候,需要修改一下,应该在构造函数当中就设置对应的uuid,而不是在后续通过方法拼接
DistributedRedisLock(StringRedisTemplate stringRedisTemplate, String lockName, String uuid) {
this.stringRedisTemplate = stringRedisTemplate;
this.lockName = lockName;
this.uuid = uuid + Thread.currentThread().getId();
}
3.8、Redisson
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
Github 托管地址:https://github.com/redisson/redisson
3.8.1、Redisson Hello World
SpringBoot整合Redisson,首先先整合redisson的依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.17.6</version>
</dependency>
而后我们采用统一配置加载redisson的配置
@Bean
public RedissonClient redisConfig() {
Config config = new Config();
config.setTransportMode(TransportMode.EPOLL);
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:7181");
return Redisson.create(config);
}
而我们可以直接使用redisson提供的方法来进行操作加锁与解锁
@Autowired
private RedissonClient redissonClient;
public void stockByRedisson(){
RLock lock = redissonClient.getLock("lock");
lock.lock();
try{
String stock = stringRedisTemplate.opsForValue().get("stock");
Integer integer = Integer.valueOf(stock);
if (stock != null && integer > 0) {
stringRedisTemplate.opsForValue().set("stock", String.valueOf(--integer));
}
}finally {
lock.unlock();
}
}
3.8.2、Redisson可重入锁
如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。
RLock lock = redissonClient.getLock("lock");
lock.lock(10, TimeUnit.SECONDS);
3.8.3、公平锁
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redissonClient.getFairLock("fairLock");
3.8.4、联锁
基于Redis的Redisson分布式联锁RedissonMultiLock对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的Redisson实例。同时加锁:lock1 lock2 lock3,所有的锁都上锁成功才算成功。
RLock lock1 = redissonClient.getLock("lock1");
RLock lock2 = redissonClient.getLock("lock2");
RLock lock3 = redissonClient.getLock("lock3");
RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1, lock2, lock3);
3.8.5、红锁
和联锁一样,在这里还提供了一个RedissonRedLock红锁类,这个和联锁的区别在于,红锁只需要当大部分锁获取成功即为成功
RedissonRedLock redissonRedLock = new RedissonRedLock(lock1, lock2, lock3);
3.8.6、读写锁
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock 接口。其中读锁和写锁都继承了RLock接口。分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
public void readLock(){
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");
rwLock.readLock().lock(10,TimeUnit.SECONDS);
System.out.println("read lock");
}
public void writeLock(){
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");
rwLock.writeLock().lock(10,TimeUnit.SECONDS);
System.out.println("write lock");
}
- 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
- 同时访问读:不用等待
- 先写后读:读要等待(约10s)写完成
- 先读后写:写要等待(约10s)读完成
3.8.7、信号量
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。而使用信号量就适用于服务器容量不够承受大并发请求,就可以通过信号量还进行限流操作
先康康Semaphore这个类占用与释放的一段demo代码
public static final int max = 6;
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < max; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "占用 ==== " + new Date());
TimeUnit.SECONDS.sleep(new Random().nextInt(30));
semaphore.release();
System.out.println(Thread.currentThread().getName() + "释放 ==== " + new Date());
} catch (Exception e) {
throw new RuntimeException(e);
}
}, i + "").start();
}
}
而在redisson当中给我们封装好了对应的信号量,我们直接使用即可
public void semaphoreLock(){
RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
semaphore.trySetPermits(3);
try {
semaphore.acquire();
TimeUnit.SECONDS.sleep(5);
System.out.println(new Date());
semaphore.release();
}catch (Exception e){
e.printStackTrace();
}
}
3.8.8、闭锁
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。适用于一个线程等待一组线程执行
public static final int max = 6;
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < max; i++) {
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 3);
System.out.println(Thread.currentThread().getName() + "递减释放 ==== " + new Date());
countDownLatch.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
}, i + "").start();
}
countDownLatch.await();
System.out.println("over");
}
而在redisson当中给我们封装好了对应的闭锁,我们直接使用即可
public void countDownLatchLock() throws InterruptedException {
RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
countDownLatch.trySetCount(10);
countDownLatch.await();
System.out.println("over");
}
public void countDown(){
RCountDownLatch countDownLatch = redissonClient.getCountDownLatch("countDownLatch");
countDownLatch.countDown();
}
4、ZooKeeper分布式锁
在这里对于ZooKeeper的安装、使用就不过多赘述了,直接参考该文章:ZooKeeper详解
4.1、使用java Api操作ZooKeeper
首先我们直接导入官方提供的依赖。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
使用api操作zk节点
public static void main(String[] args) throws InterruptedException, KeeperException {
ZooKeeper zooKeeper = null;
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
zooKeeper = new ZooKeeper("localhost:2181", 30000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (Event.KeeperState.SyncConnected.equals(event.getState())
&& Event.EventType.None.equals(event.getType())
) {
System.out.println("get link ====>>>> " + event.getState());
} else {
System.out.println("event listen === >>>>> " + event.getType());
}
}
});
String s = zooKeeper.create("/dia", "huangdiaomao".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("create node dia ====>>>> " + s);
Stat exists = zooKeeper.exists("/dia", true);
System.out.println("exists node ====>>>> " + exists.toString());
byte[] data = zooKeeper.getData("/dia", false, null);
System.out.println("dia node res ===== >>>> " + new String(data));
List<String> children = zooKeeper.getChildren("/zookeeper", true);
for (String child : children) {
System.out.println("child ==== >>>> " + child);
}
zooKeeper.setData("/dia", "change data".getBytes(), exists.getVersion());
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
4.2 实现一个ZK分布式锁
4.2.1、思路分析
分布式锁的步骤:
- 获取锁:create一个节点
- 删除锁:delete一个节点
- 重试:没有获取到锁的请求重试
参照redis分布式锁的特点:
- 互斥 排他
- 防死锁:
- 可自动释放锁(临时节点) :获得锁之后客户端所在机器宕机了,客户端没有主动删除子节点;如果创建的是永久的节点,那么这个锁永远不会释放,导致死锁;由于创建的是临时节点,客户端宕机后,过了一定时间zookeeper没有收到客户端的心跳包判断会话失效,将临时节点删除从而释放锁。
- 可重入锁:借助于ThreadLocal
- 防误删:宕机自动释放临时节点,不需要设置过期时间,也就不存在误删问题。
- 加锁/解锁要具备原子性
- 单点问题:使用Zookeeper可以有效的解决单点问题,ZK一般是集群部署的。
- 集群问题:zookeeper集群是强一致性的,只要集群中有半数以上的机器存活,就可以对外提供服务。
4.2.2、代码实现
- 多个请求同时添加一个相同的临时节点,只有一个可以添加成功。添加成功的获取到锁
- 执行业务逻辑
- 完成业务流程后,删除节点释放锁。
|