背景介绍
发现还没有写过redis的文章,头脑风暴一波,也没回想起什么知识点,打算写篇文章记录,便于日后复习。 只要提到redis,想到的就是缓存。因为他是基于内存存储的,是数据都是存在内存中的,为了数据不丢失,也会把数据持久化在磁盘中。除了缓存,redis还可以其他的用处,比如消息队列。这些用处源于redis支持的数据结构,所以开始先从redis支持的数据结构开始介绍。
redis的数据结构
redis中的数据结构的区分主要在于value,key统一是字符串。
redis中的基本命令
以下命令参考均来自菜鸟教程,该链接中已经对每个命令进行了详细介绍。本文中就不再过多的介绍了,有一个重要的:
- del key:删除键
- exists key:判断是否存在该key
- expire key:给该key设置过期时间
- persist key:取消该key的过期时间
- TTL key:查看该key的剩余时间(该key不存在返回-2,没有设置过期时间返回-1,其余的返回剩余的过期时间)
- keys *:返回匹配表达式的key
- type key:返回key的类型
String类型
value是String类型,保存的是字符串。最大支持512M。
基本命令
分析一些常用的命令 SET key value:设置指定 key 的值 GET key:获取指定 key 的值。如果key不是String类型,或报错。 MGET key1 [key2…]:获取所有(一个或多个)给定 key 的值。 STRLEN key:返回 key 所储存的字符串值的长度。 MSET key value [key value …]:同时设置一个或多个 key-value 对。
INCR key:将 key 中储存的数字值增一,返回结果值,如果不是数字,则会报错。
INCRBY key increment:将 key 所储存的值加上给定的增量值(increment) 。 DECR key:将 key 中储存的数字值减一。
APPEND key value:如果 key 已经存在并且是一个字符串, APPEND 命令将指定的 value 追加到该 key 原来值(value)的末尾。 基本的命令介绍完了,可以设值和取值,并且可以操作区间的字符串。深入一点,底层结构是什么样子的呢?下一模块介绍。
List数据结构
基本命令
列表类型,是一些数据的集合。一个key可以保存多个数据。可以理解为java中的list。**注意列表的第一个元素是左边的的元素,最后的元素在右边。**列表算是有序,只不过这里的顺序是插入的顺序,而不是数据的排序。
命令不难理解,就不再详细分析,直接去链接中看就好,里面都有详细的例子。
Set数据结构
基本命令
set的直译是集合,集合的定义是不能有重复数据的,所以set这种数据结构是不会有重复数据的。 set中有一个是比list多出来的功能,就是可以判断一个元素是否在集合中,sismember key member命令。
Sorted Set数据结构
有序集合,两个关键的概念,有序:根据用户提供的数据进行排序。集合:不能有重复的数据。 注意:
- ByLex后缀的是对值进行判断,可以自定义区间
- zrange是对score进行判断,都是取得是闭区间
- zrangeByScore是对score进行判断,可以自定义区间
值是可以自定义区间,而score只有明确指定命令,才可以自定义区间,否则就是闭区间。
Hash数据结构
基本命令
Hash的命令很近单,和String数据结构的命令相似。
注意:了解了这五种数据结构,他们key或者value都是字符串,不能保存类似于这样的结构:List<List> ,只能是将里面的list进行序列化成字符串之后放在List数据结构中。
redis的持久化
为了保证数据的安全,redis会将内存中的数据持久化在磁盘中,其中有两种方式:1,RDB:保存的是数据;AOF:保存的是之前执行过的命令。
RDB持久化
RDB方式保存的是数据,也就是说当触发了持久化之后,redis会将当前时刻的所有数据保存在磁盘中。当然持久化不会占用执行命令的线程,而是单独fork一个新的进程用来数据,将数据放在一个临时文件中,保存完毕之后替换原来的。 什么时候会触发持久化呢?
- 配置文件中配置,意思:每60秒如果有5次的更新数据,就会触发事件
- 手动触发,bgsave,save
- 退出redis时。
聊聊redis是如何保存rdb文件的 用户可以使用命令save,或者bgsave触发rdb生成机制。redis是怎么做的呢?这里面有一个为了避免持久化时主线程接收命令。redis会对内存中的数据做了一份副本。子进程持久化的是副本数据,而主进程操作的是真正的数据。这样就不会阻塞主进程了。这个技术还有个专属名字叫copy-on-write,读写分离。理解了底层机制,就更好吗,明白了rdb又叫快照的原因。
问题: 这种机制会导致会丢数据(如果在两次触发持久化中间,redis宕机,那么数据还没有来得及保存,导致直接丢失)。 优点: RDB是保存的数据,所以恢复数据的时候更快,直接加载到内存就可以了。
AOF持久化
AOF方式保存的是执行的命令。如果redis宕机,加载AOF文件,“回放”执行命令,同样可以恢复数据。 底层原理:
- 执行命令和文件写入的顺序
AOF涉及到了执行命令,而命令执行和文件的写入有一个顺序问题,到底是谁先呢?redis选用的是先执行命令,后写入文件。这样做的好处避免了命令的校验问题从而阻塞了该条命令的执行。
- 刷新磁盘的机制
在内存刷新到磁盘的过程中,并非一次操作就刷新一次,系统的操作是放在了缓冲区中,在缓冲区满或者时间超时才会刷新到磁盘中。刷新的机制影响了数据一致性的问题。redis提供了三种选择:摘自:码哥字节
问题: 由于是命令的追加,不可避免的就是文件会越来越大,为了解决该问题,redis会重写aof文件(命令的合并)来缩减文件大小。第二个问题,由于文件大,恢复数据时,速度会慢于rdb。 AOF文件重写: redis会fork出一个子进程用来文件重写,期间的命令会积累在缓存(包括新的aof缓冲)中,同时写入老的aof文件,待重写完成后,将重写缓冲的命令集合追加入新的文件。 优点: 可以尽量小的避免数据的丢失。
两种持久化的选择
两者都开启,AOF的作用更侧重持久化,而RDB的作用更侧重于备份。注意:redis重启后,会优先加载aof文件,因为aof文件的数据完整性更高。高版本的redis提供了一种机制,持久化使用rdb,而在期间的命令是记录在AOF文件中。
redis的高可用
如果单台redis宕机,服务将不可用,为了解决这个问题,redis为我们提供了多个解决方案。
主从
redis的主从模式,顾名思义就是一个主节点,多个从节点。将主节点的数据,备份在多个从节点中。在配置后,主机可负责读写服务,从机只负责读。redis 提供这种配置方式,为的是让其支持数据的弱一致性,即最终一致性(如果在部分同步时,主机宕机,那么从节点中的数据显然是不一致的。)。在业务中,选择强一致性还是弱一致性,应该取决于具体的业务需求,像微博,完全可以使用弱一致性模型;像淘宝,可以选用强一致性模型。 配置 默认情况下,一个redis就是主节点。只要配置从节点即可。 从节点配置有三种方法:
- 在从节点的配置文件中:replicaof
- 启动redis服务的时候,追加参数:redis-server --replicaof
- 客户端发送名的形式:replicaof
积压空间 在《深入剖析 redis AOF 持久化策略》中,介绍了更新缓存的概念,举一个例子:客户端发来命令:set name Jhon,这一数据更新被记录为:*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nJhon\r\n,并存储在更新缓存中。
同样,在主从连接中,也有更新缓存的概念。只是两者的用途不一样,前者被写入本地,后者被写入从机,这里我们把它成为积压空间。
更新缓存存储在 server.repl_backlog,redis 将其作为一个环形空间来处理,这样做节省了空间,避免内存再分配的情况。 积压空间中的数据变更记录是什么时候被写入的?在执行一个 redis 命令的时候,如果存在数据的修改(写),那么就会把变更记录传播。redis 源码中是这么实现的:call()->propagate()->replicationFeedSlaves()
注释:命令真正执行的地方在 call() 中,call() 如果发现数据被修改(dirty),则传播 propagrate(),replicationFeedSlaves() 将修改记录写入积压空间和所有已连接的从机。
这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?为什么不仅仅将数据分发给所有从机呢?
因为有一些从机会因特殊情况(???)与主机断开连接,注意从机断开前有暂存主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。redis 为了让断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。从 replicationFeedSlaves() 实现来看,在线的 slave 能马上收到数据更新记录;因某些原因暂时断开连接的 slave,需要从积压空间中找回断开期间的数据更新记录。如果断开的时间足够长,master 会拒绝 slave 的部分同步请求,从而 slave 只能进行全同步。
主从同步机制 redis 主从同步有两种方式(或者所两个阶段):全同步和部分同步。
主从刚刚连接的时候,进行全同步;全同步结束后,进行部分同步。当然,如果有需要,slave 在任何时候都可以发起全同步。redis 策略是,无论如何,首先会尝试进行部分同步,如不成功,要求从机进行全同步,并启动 BGSAVE……BGSAVE 结束后,传输 RDB 文件;如果成功,允许从机进行部分同步,并传输积压空间中的数据。
全同步 接着自动发起 PSYNC 请求 master 进行全同步。无论如何,redis 首先会尝试部分同步,如果失败才尝试全同步。而刚刚建立连接的 master-slave 需要全同步。
从机连接主机后,会主动发起 PSYNC 命令,从机会提供 master_runid 和 offset,主机验证 master_runid 和 offset 是否有效?master_runid 相当于主机身份验证码,用来验证从机上一次连接的主机,offset 是全局积压空间数据的偏移量。 验证未通过则,则进行全同步:主机返回 +FULLRESYNC master_runid offset(从机接收并记录 master_runid 和 offset,并准备接收 RDB 文件)接着启动 BGSAVE 生成 RDB 文件,BGSAVE 结束后,向从机传输,从而完成全同步。
部分同步 如上所说,无论如何,redis 首先会尝试部分同步。部分同步即把积压空间缓存的数据,即更新记录发送给从机。
从机连接主机后,会主动发起 PSYNC 命令,从机会提供 master_runid 和 offset,主机验证 master_runid 和 offset 是否有效? 验证通过则,进行部分同步:主机返回 +CONTINUE(从机接收后会注册积压数据接收事件),接着发送积压空间数据。
暂缓主机 从机因为某些原因,譬如网络延迟(PING 超时,ACK 超时等),可能会断开与主机的连接。这时候,从机会尝试保存与主机连接的信息,譬如全局积压空间数据偏移量等,以便下一次的部分同步,并且从机会再一次尝试连接主机。注意一点,如果断开的时间足够长, 部分同步肯定会失败的。
总结 简单来说,主从同步就是 RDB 文件的上传下载;主机有小部分的数据修改,就把修改记录传播给每个从机
哨兵
搭建 直接参考码哥字节
原理 参考码哥字节 哨兵,顾名思义就是找个放哨的,有问题出现的时候通知一声。redis的哨兵模式主要干三件事:
- 监控主从节点的状态(是否正常服务)
- 如果主节点宕机,选举出一个从节点成为主节点,并通知所有的从节点
- 通知客户端,主节点已经换了。(客户端无需关心主节点,客户端只与哨兵构建联系)
下面分别详细分析这三个步骤:
监控节点
哨兵是如何监控节点的呢?哨兵会给节点发送PING命令,如果返回的值不是指定的值或者超时返回,都会判定该节点已经下线(指定的值是:返回 +PONG、-LOADING、-MASTERDOWN 任何一种)。
为了避免错判,即可能只有该哨兵和节点的连接出现问题,其他的哨兵和节点都连接正常,在判定该节点真正下线之前,需要超过一个阈值的个数的哨兵个数,如果都认为该节点已经下线,则判定该节点下线,这个阈值是可以自定义的。
最后的2是阈值,大于等于2个就会认为该节点下线。
sentinel monitor mymaster 127.0.0.1 6379 2
哨兵是如何监控从节点的呢? 在配置的时候,发现只配置的了主节点的iP+port,如何监控从节点的呢?哨兵会给主节点发送命令INFO,查看其中从节点的信息,之后与从节点建立连接。
哨兵间是怎么沟通的呢? 配置文件中也没有配置其他哨兵的信息,它们之间是如何交换信息的呢?使用到了信息订阅发布技术。每个哨兵都会和主节点连接,并订阅消息,通道中的信息每个哨兵都可见。
选举新的主节点 当一个哨兵认为主节点已经下线,会在通道中发消息,如果阈值个数的哨兵也认为下线则判断该主节点下线,并从从节点选出一个。
切换主节点 这么多的哨兵,谁来切换主从呢?这也是需要选举产生了,只要一个哨兵的选举票数超过阈值,那么它就可以做这个操作。
通知客户端主节点已经切换 客户端和哨兵之间也是通过消息订阅发布进行消息沟通的。如果切换了主节点,会发布消息,客户端只要订阅通道就可以了。
集群
搭建 参考:码哥字节
原理 redis集群采用的是去中心化的思想,不再一个节点保存所有的数据,而是每个节点保存一部分的数据。redis用槽来保存数据,类似于Map,一共16384个槽,每个节点维护一部分槽。
数据key是如何映射到槽位的呢? key通过CRC16算法,生成一个16位的数,之后对16384取模,得到对应的槽位。
如何知道节点维护的槽位呢? 创建集群的时候,默认是平分。同时支持用户的手动分配。
集群中是如何知道其他节点维护的槽位呢? 集群中的节点相互连接,通过Gossip协议,发送自己维护的槽位,每个节点保存其他节点维护的信息。类似于两个数组(16384长度):数组1是保存的是维护该槽的节点,第二个数组是保存的是该节点维护的的槽。
手动变更槽位 这个其实也比较简单,当我们向Redis集群中的某个节点发送CLUSTER ADDSLOTS命令时,当前节点首先会通过clusterState中的slots数组来确认指派给当前节点的槽是否没有指派给其他节点,如果已经指派了,那么会直接抛出异常,返回错误给指派的客户端。如果指派给当前节点的所有槽都未指派给其他节点,那么当前节点会将这些槽指派给自己。指派主要有三个步骤:
- 更新clusterState的slots数组,将指定槽slots[i]指向当前clusterNode
- 更新clusterNode的slots数组,将指定槽slots[i]处的值更新为1
- 向集群中的其他节点发送消息,将clusterNode的slots数组发送给其他节点,其他节点接收到消息后也更新对应的clusterState的slots数组和clusterNode的slots数组
参考文章: 码哥字节 Redis集群详述——从服务内部讲解
redis中真的有事务吗?
我记得redis是没有事务的呀,怎么有看到许多文章都是介绍redis事务的呢?记住一点,有疑问就去到官网上找答案,博客中的东西不一定准确,(我的观点是真正的大佬应该没有时间写博客。况且大部分博客都是拼凑起来的东西,左抄抄,右抄抄,当然我写的博客也是这样,我想表达的意思学习一样技术,上官网,之后才结合博客。),到了官网一看,我去,真的有事务:redis官网 用我的n年前的四级水平翻一下:可以执行一组命令,作为一个单一事务。好吧,我记错了了,下面开始学习下吧。
基本使用
官网中解释了redis中事务可以做哪些事情: 涉及到了redis事务的基本命令:MULTI,EXEC,DISCARD,WATCH。它实现了一步执行一组命令,同时保证了以下两点:
- 事务中的命令都是串行的,并且顺序执行的。不会发生在执行本事务的时候,突然切换来服务另一个客户端的请求。保证了事务间的隔离性。
- 要么所有命令都执行,要么都不执行。redis事务是原子的。如果在执行exec命令之前,客户端断开连接了,那么命令都不会得到执行;相反,如果exec成功执行,那么里面的一组操作也会全部执行。
官网是这些的,但是学完基本使用之后你会发现好像有点问题,具体什么问题,学完再说。
正常执行
> mget test1 test2 test3
str
1
1
> multi
OK
> incr test2
QUEUED
> incr test3
QUEUED
> exec
2
2
首先是使用命令multi ,作为启动一个事务操作,之后执行一组命令,最后执行exec命令,执行这一组命令。一组命令中,返回都是QUEUED,说明成功放入到redis中的执行队列中了。如果是不想要这个事务了,执行discard命令,丢弃这个事务。
事务内部发生了错误
- 没有进入到命令队列中
在我们每编写一个命令,会有一个回馈,QUEUED,说明该命令已经进入命令执行的队列。如果命令是错误的,那么将无法进入队列。执行exec的时候会由于这个错误丢失这个命令队列。
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr test2
QUEUED
127.0.0.1:6379> incr test3
QUEUED
127.0.0.1:6379> dest tst2
(error) ERR unknown command 'dest'
127.0.0.1:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> mget test2 test3
1) "2"
2) "2"
127.0.0.1:6379>
- 执行exec命令发生错误
多数是由于命令的逻辑错误,比如本例中的,使用list的命令操作一个String类型的key。错误的命令不会执行,但是正确的命令依旧会执行。
127.0.0.1:6379> mget test2 test3
1) "2"
2) "2"
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr test2
QUEUED
127.0.0.1:6379> lpush test2 str
QUEUED
127.0.0.1:6379> incr test3
QUEUED
127.0.0.1:6379> exec
1) (integer) 3
2) (error) WRONGTYPE Operation against a key holding the wrong kind of val
3) (integer) 3
127.0.0.1:6379>
不回滚的原因
想想很奇怪,原子性不就是要么都执行,要么都不执行,你这执行了一半,还说自己是原子的。不是很奇怪吗?作者给出了原因:
- 多数的失败都是命令的失败,而编写命令的是开发者。所以只要开发者能编写正确的命令,就无需回滚操作。
- redis核心是快速,小巧,无需回滚的能力。
对比关系型数据库,由于业务的异常,导致了数据库的回滚。和redis一样。那么如果业务逻辑都是正常的,就无须回滚了。好像关系型数据库是有一个兜底的操作(回滚),而redis则需要完全的正确。否则会影响业务的一致性。
乐观锁
由于exec是真正的执行命令,所以在发送前面一组命令时,如果key的预期值已经改变,则不再执行该事务。WATCH机制更多的监控key,如果在执行exec命令前已经改变,就丢弃事务了。也就是说保证了整个过程的原子性。从watch key到exec的过程。
ACID的讨论
- 原子性:由于redis存在部分执行的情况,所以不具备原子性
- 隔离性:事务执行中,其他的命令请求不能将其打断,具备隔离性
- 持久性:如果使用rdb,在备份前宕机,那么数据将无法持久化在磁盘中,不具备持久性
- 一致性:前三个特性来保证一致性。三个缺一不可,所以一致性一定是无法保证的。例如:部分执行,如果前后命令有逻辑关系,一个执行了,一个没执行,业务的一致性是得不到保证的。
与springboot整合
pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
自动配置类分析
org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({ LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class })
public class RedisAutoConfiguration {
@Bean
@ConditionalOnMissingBean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory)
throws UnknownHostException {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
redis连接池的切换
SpringBoot2.0默认采用Lettuce客户端来连接Redis服务端的。默认是不使用连接池的,只有配置 redis.lettuce.pool下的属性的时候才可以使用到redis连接池。
spring:
redis:
database: 0
host: 192.168.65.135
port: 6379
password: 123456
timeout: 10000ms
lettuce:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
没有这个配置时 增加这个配置时 同时,使用连接池,要依赖commons-pool2
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
如果没有引入,会报错
如果你想使用jedis客户端,则需要配置,将lettuce变成jedis
spring:
redis:
database: 0
host: 192.168.65.135
port: 6379
password: 123456
timeout: 10000ms
jedis:
pool:
max-active: 8
max-wait: -1ms
max-idle: 8
min-idle: 0
但是有一点要注意,依赖包的引用里,要去掉lettuce,并且加上jedis的依赖包,否则都是走的lettuce客户端
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
jedis的客户端默认增加了pool的连接池依赖包,所以Jedis默认你配置与否都会有连接池,而lettuce则需要配置文件中配置一下
redis自定义序列化方式
redis默认采用的是JDK的方式; 一般开发中,保存的是对象,最好使用json形式序列化对象。
@Configuration
public class RedisConfig {
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
分布式锁
为什么有分布式锁?当前分布式架构流行,所以一个服务有多个节点。但是节点中的代码中的锁只能限制线程,多个节点间没有限制,所以出现了限制进程的锁。 redis可以实现分布式锁。需要注意以下几点:
- 设置过期时间,防止无法释放锁的情况。
- 释放锁的时候,要进行校验,防止释放了别人加的锁。
什么时候会出现以上两种情况呢?1,没有设置过期时间,一旦业务逻辑出错,无法执行释放锁逻辑。那么将一直锁定。2,如果业务时间过长,超过了过期时间,锁释放了,第二个线程进来,执行业务,此时,业务1执行完了,要释放锁了,但是它的锁已经释放了,它要是再释放,就是释放别人的锁了,所以要加验证。
实现了重入锁功能
package com.ztryou.springboot.config;
import com.ztryou.springboot.utils.SpringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
@Service
public class RedisLock {
@Autowired
private RedisTemplate redisTemplate;
private static final String PREFIX = UUID.randomUUID().toString();
private long expireTime = 50 * 1000;
private long waitTime = 5000;
private String keyName;
public String getKeyName() {
return keyName;
}
public void setKeyName(String keyName) {
this.keyName = keyName;
}
public boolean lock(String key) {
setKeyName(key);
while (true) {
Long ttl = tryAcquire(key, -1, null);
if (ttl == null) {
return true;
}
Thread.yield();
}
}
private String getHashKey() {
return PREFIX + ":" + Thread.currentThread().getId();
}
public Long tryAcquire(String key, long timeOut, TimeUnit timeUnit) {
long leaseTime = expireTime;
if (timeOut != -1) {
leaseTime = timeUnit.toMillis(timeOut);
}
String script = "if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);";
DefaultRedisScript<Long> lua = new DefaultRedisScript(script);
lua.setResultType(Long.class);
Long ret = (Long) redisTemplate.execute(lua, Collections.singletonList(key), leaseTime, getHashKey());
return ret;
}
public void unlock() {
String script = "if (redis.call('exists', KEYS[1]) == 0) then " +
"return 1; " +
"end;" +
"if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then " +
"return 1;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 1; " +
"else " +
"redis.call('del', KEYS[1]); " +
"return 1; " +
"end; " +
"return 0;";
DefaultRedisScript<Long> lua = new DefaultRedisScript(script);
lua.setResultType(Long.class);
redisTemplate.execute(lua, Collections.singletonList(getKeyName()), getHashKey(), expireTime);
}
}
|