一、分布式锁介绍及使用
1.1 锁介绍
排它锁: 不允许多个程序(线程、进程)同时访问某个共享资源 共享锁: 又称为读锁,获得共享锁后,可以查看,但无法删除和修改数据, 其它线程此时也能获取到共享锁,也可以查看但是无法修改和删除数据 分布式锁: 实现跨进程共享资源的互斥(实质是排它锁)
1.2 分布式锁使用
1.2.1 配置
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>spring-boot-redis-client-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-redis-client-example</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
redisson.yml
singleServerConfig:
address: redis://127.0.0.1:6379
codec: !<org.redisson.codec.JsonJacksonCodec> {}
application.yml
spring:
redis:
redisson:
file: classpath:redisson.yml
1.2.2 应用
RedisLockMain.java
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class RedisLockMain {
private static RedissonClient redissonClient;
static{
Config config=new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
redissonClient= Redisson.create(config);
}
public static void main(String[] args) throws InterruptedException {
RLock rLock=redissonClient.getLock("updateRepo");
for (int i = 0; i < 10; i++) {
if(rLock.tryLock()){ //返回true,表示获得锁成功
System.out.println("获得锁成功");
}else{
System.out.println("获得锁失败");
}
Thread.sleep(2000);
rLock.unlock();
}
}
}
二、Redis实现分布式锁的原理
通过redisson,非常简单就可以实现我们所需要的功能,当然这只是redisson的冰山一角,redisson最强大的地方就是提供了分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的并发程序工具包获得了协调分布式多机多线程并发系统的能力,降低了程序员在分布式环境下解决分布式问题的难度,下面分析一下RedissonLock的实现原理
2.1 RedissonLock.tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws
InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//通过tryAcquire方法尝试获取锁
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { //表示成功获取到锁,直接返回
return true;
}
//省略部分代码....
}
2.2 tryAcquire
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime,
TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
//leaseTime就是租约时间,就是redis key的过期时间。
if (leaseTime != -1) { //如果设置过期时间
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit,
threadId, RedisCommands.EVAL_LONG);
} else {//如果没设置了过期时间,则从配置中获取key超时时间,默认是30s过期
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId,
RedisCommands.EVAL_LONG);
}
//当tryLockInnerAsync执行结束后,触发下面回调
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { //说明出现异常,直接返回
return;
}
// lock acquired
if (ttlRemaining == null) { //表示第一次设置锁键
if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,
并返回
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //leaseTime=-1,启动Watch Dog
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
2.3 tryLockInnerAsync
通过lua脚本来实现加锁的操作
- 判断lock键是否存在,不存在直接调用hset存储当前线程信息并且设置过期时间,返回nil,告诉客户端直接获取到锁。
- 判断lock键是否存在,存在则将重入次数加1,并重新设置过期时间,返回nil,告诉客户端直接获取到锁。
- 被其它线程已经锁定,返回锁有效期的剩余时间,告诉客户端需要等待。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()),
unit.toMillis(leaseTime), getLockName(threadId));
}
2.4 unlock释放锁流程
释放锁的流程,脚本看起来会稍微复杂一点
- 如果lock键不存在,通过 publish 指令发送一个消息表示锁已经可用。
- 如果锁不是被当前线程锁定,则返回nil
- 由于支持可重入,在解锁时需要将重入次数减1
- 如果计算后的重入次数>0,则重新设置过期时间
- 如果计算后的重入次数<=0,则发消息说锁已经可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1],
ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
2.5 RedissonLock有竞争的情况
有竞争的情况在redis端的lua脚本是相同的,只是不同的条件执行不同的redis命令。当通过tryAcquire发现锁被其它线程申请时,需要进入等待竞争逻辑中
- this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败。
- this.await返回true,进入循环尝试获取锁。
继续看RedissonLock.tryLock后半部分代码如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws
InterruptedException {
//省略部分代码
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 订阅监听redis消息,并且创建RedissonLockEntry
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待subscribe的future的结果对象,如果subscribe方法调用超过了time,说明已经超
过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) { //取消订阅
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId); //表示抢占锁失败
return false; //返回false
}
try {
//判断是否超时,如果等待超时,返回获取锁失败
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//通过while循环再次尝试竞争锁
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //竞争锁,
返回锁超时时间
// lock acquired
if (ttl == null) { //如果超时时间为null,说明获得锁成功
return true;
}
//判断是否超时,如果超时,表示获取锁失败
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 通过信号量(共享锁)阻塞,等待解锁消息. (减少申请锁调用的频率)
// 如果剩余时间(ttl)小于wait time ,就在 ttl 时间内,从Entry的信号量获取
一个许可(除非被中断或者一直没有可用的许可)。
// 否则就在wait time 时间范围内等待可以通过信号量
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl,
TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time,
TimeUnit.MILLISECONDS);
}
// 更新等待时间(最大等待时间-已经消耗的阻塞时间)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { //获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId); //取消订阅
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
2.6 锁过期了怎么办
一般来说,我们去获得分布式锁时,为了避免死锁的情况,我们会对锁设置一个超时时间,但是有一种情况是,如果在指定时间内当前线程没有执行完,由于锁超时导致锁被释放,那么其他线程就会拿到这把锁,从而导致一些故障。为了避免这种情况,Redisson引入了一个Watch Dog机制,这个机制是针对分布式锁来实现锁的自动续约,简单来说,如果当前获得锁的线程没有执行完,那么Redisson会自动给Redis中目标key延长超时时间。 默认情况下,看门狗的续期时间是30s,也可以通过修改Config.lockWatchdogTimeout来另行指定。
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException
{
return tryLock(waitTime, -1, unit); //leaseTime=-1
}
实际上,当我们通过tryLock方法没有传递超时时间时,默认会设置一个30s的超时时间,避免出现死锁的问题。
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime,
TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit,
threadId, RedisCommands.EVAL_LONG);
} else { //当leaseTime为-1时,leaseTime=internalLockLeaseTime,默认是30s,表示当
前锁的过期时间。
//this.internalLockLeaseTime =
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId,
RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { //说明出现异常,直接返回
return;
}
// lock acquired
if (ttlRemaining == null) { //表示第一次设置锁键
if (leaseTime != -1) { //表示设置过超时时间,更新internalLockLeaseTime,
并返回
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //leaseTime=-1,启动Watch Dog
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
由于默认设置了一个30s的过期时间,为了防止过期之后当前线程还未执行完,所以通过定时任务对过期时间进行续约。
- 首先,会先判断在expirationRenewalMap中是否存在了entryName,这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,
- 如果已经存在了,就直接返回;主要是考虑到RedissonLock是可重入锁。
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry =
EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {// 第一次加锁的时候会调用,内部会启动WatchDog
entry.addThreadId(threadId);
renewExpiration();
}
}
定义一个定时任务,该任务中调用 renewExpirationAsync 方法进行续约。
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//用到了时间轮机制
Timeout task = commandExecutor.getConnectionManager().newTimeout(new
TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync续约租期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + "
expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的1/3时间执行
ee.setTimeout(task);
}
执行Lua脚本,对指定的key进行续约。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
三、LUA
Lua是一个高效的轻量级脚本语言(和JavaScript类似),用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。Lua在葡萄牙语中是“月亮”的意思,它的logo形式卫星,寓意是Lua是一个“卫星语言”,能够方便地嵌入到其他语言中使用;其实在很多常见的框架中,都有嵌入Lua脚本的功能,比如OpenResty、Redis等。 使用Lua脚本的好处:
- 减少网络开销,在Lua脚本中可以把多个命令放在同一个脚本中运行
- 原子操作,redis会将整个脚本作为一个整体执行,中间不会被其他命令插入。换句话说,编写脚本的过程中无需担心会出现竞态条件
- 复用性,客户端发送的脚本会永远存储在redis中,这意味着其他客户端可以复用这一脚本来完成同样的逻辑
3.1 Lua的下载和安装
Lua是一个独立的脚本语言,所以它有专门的编译执行工具,下面简单带大家安装一下。
- 下载Lua源码包: https://www.lua.org/download.html https://www.lua.org/ftp/lua-5.4.3.tar.gz
- 安装步骤如下
tar -zxvf lua-5.4.3.tar.gz
cd lua-5.4.3
make linux
make install
如果报错,说找不到readline/readline.h, 可以通过yum命令安装
yum -y install readline-devel ncurses-devel
最后,直接输入 lua 命令即可进入lua的控制台。Lua脚本有自己的语法、变量、逻辑运算符、函数等,简单演示两个案例如下。
array = {"Lua", "mvp"}
for i= 0, 2 do
print(array[i])
end
array = {"mvp", "redis"}
for key,value in ipairs(array)
do
print(key, value)
end
3.2 Redis与Lua
Redis中集成了Lua的编译和执行器,所以我们可以在Redis中定义Lua脚本去执行。同时,在Lua脚本中,可以直接调用Redis的命令,来操作Redis中的数据。
redis.call(‘set’,'hello','world')
local value=redis.call(‘get’,’hello’)
redis.call 函数的返回值就是redis命令的执行结果,前面我们介绍过redis的5中类型的数据返回的值的类型也都不一样,redis.call函数会将这5种类型的返回值转化对应的Lua的数据类型,在很多情况下我们都需要脚本可以有返回值,毕竟这个脚本也是一个我们所编写的命令集,我们可以像调用其他redis内置命令一样调用我们自己写的脚本,所以同样redis会自动将脚本返回值的Lua数据类型转化为Redis的返回值类型。 在脚本中可以使用return 语句将值返回给redis客户端,通过return语句来执行,如果没有执行return,默认返回为nil。
3.3 Redis中执行Lua脚本相关的命令
编写完脚本后最重要的就是在程序中执行脚本。Redis提供了EVAL命令可以使开发者像调用其他Redis内置命令一样调用脚本。
3.3.1 EVAL命令-执行脚本
[EVAL] [脚本内容] [key参数的数量] [key …] [arg …] 可以通过key和arg这两个参数向脚本中传递数据,他们的值可以在脚本中分别使用KEYS和ARGV 这两个类型的全局变量访问。 比如我们通过脚本实现一个set命令,通过在redis客户端中调用,那么执行的语句是:
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
上述脚本相当于使用Lua脚本调用了Redis的 set 命令,存储了一个key=lua,value=hello到Redis中。
3.3.2 EVALSHA命令
考虑到我们通过eval执行lua脚本,脚本比较长的情况下,每次调用脚本都需要把整个脚本传给redis,比较占用带宽。为了解决这个问题,redis提供了EVALSHA命令允许开发者通过脚本内容的SHA1摘要来执行脚本。该命令的用法和EVAL一样,只不过是将脚本内容替换成脚本内容的SHA1摘要
- Redis在执行EVAL命令时会计算脚本的SHA1摘要并记录在脚本缓存中
- 执行EVALSHA命令时Redis会根据提供的摘要从脚本缓存中查找对应的脚本内容,如果找到了就执行脚本,否则返回“NOSCRIPT No matching script,Please use EVAL”
script load "return redis.call('get','lua')"
evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
3.3.3 Redisson执行Lua脚本
通过lua脚本来实现一个访问频率限制功能。思路: 定义一个key,key中包含ip地址,value为指定时间内的访问次数,比如说是10秒内只能访问3次。
local times=redis.call('incr',KEYS[1])
-- 如果是第一次进来,设置一个过期时间
if times == 1 then
redis.call('expire',KEYS[1],ARGV[1])
end
-- 如果在指定时间内访问次数大于指定次数,则返回0,表示访问被限制
if times > tonumber(ARGV[2]) then
return 0
end
-- 返回1,允许被访问
return 1
@RestController
public class RedissonController {
@Autowired
RedissonClient redissonClient;
private final String LIMIT_LUA=
"local times=redis.call('incr',KEYS[1])\n" +
"if times == 1 then\n" +
" redis.call('expire',KEYS[1],ARGV[1])\n" +
"end\n" +
"if times > tonumber(ARGV[2]) then\n" +
" return 0\n" +
"end\n" +
"return 1";
@GetMapping("/lua/{id}")
public String lua(@PathVariable("id") Integer id) throws
ExecutionException, InterruptedException {
List<Object> keys= Arrays.asList("LIMIT:"+id);
RFuture<Object> future=redissonClient.getScript().
evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA,
RScript.ReturnType.INTEGER,keys,10,3);
return future.get().toString();
}
}
需要注意,上述脚本执行的时候会有问题,因为redis默认的序列化方式导致value的值在传递到脚本中时,转成了对象类型,需要修改 redisson.yml 文件,增加codec的序列化方式。 redisson.yml
singleServerConfig:
address: redis://127.0.0.1:6379
codec: !<org.redisson.codec.JsonJacksonCodec> {}
3.4 Lua脚本的原子性
redis的脚本执行是原子的,即脚本执行期间Redis不会执行其它命令。所有的命令必须等待脚本执行完以后才能执行。为了防止某个脚本执行时间过长导致Redis无法提供服务。Redis提供了lua-time-limit参数限制脚本的最长运行时间。默认是5秒钟。
3.4.1 非事务性操作
当脚本运行时间超过这个限制后,Redis将开始接受其他命令但不会执行(以确保脚本的原子性),而是返回BUSY的错误,下面演示一下这种情况。打开两个客户端窗口,在第一个窗口中执行lua脚本的死循环
eval "while true do end" 0
在第二个窗口中运行 get lua ,会得到如下的异常。
(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
我们会发现执行结果是Busy, 接着我们通过script kill 的命令终止当前执行的脚本,第二个窗口的显示又恢复正常了。
3.4.2 存在事务性操作
如果当前执行的Lua脚本对Redis的数据进行了修改(SET、DEL等),那么通过SCRIPT KILL 命令是不能终止脚本运行的,因为要保证脚本运行的原子性,如果脚本执行了一部分终止,那就违背了脚本原子性的要求。最终要保证脚本要么都执行,要么都不执行,同样打开两个窗口,第一个窗口运行如下命令
eval "redis.call('set','name','mvp') while true do end" 0
在第二个窗口运行
get lua
结果一样,仍然是busy,但是这个时候通过script kill命令,会发现报错,没办法kill。
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到这种情况,只能通过shutdown nosave命令来强行终止redis。 shutdown nosave和shutdown的区别在于 shutdown nosave不会进行持久化操作,意味着发生在上一次快照后的数据库修改都会丢失。
3.5 Redisson的Lua脚本
了解了lua之后,我们再回过头来看看Redisson的Lua脚本,就不难理解了。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()),
unit.toMillis(leaseTime), getLockName(threadId));
}
四、Redis中的Pub/Sub机制
下面是Redisson中释放锁的代码,在代码中我们发现一个publish的指令 redis.call(‘publish’,KEYS[2], ARGV[1]) ,这个指令是干啥的呢?
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1],
ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Redis提供了一组命令可以让开发者实现“发布/订阅”模式(publish/subscribe) . 该模式同样可以实现进程间的消息传递,它的实现原理是:
- 发布/订阅模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或多个频道,而发布者可以向指定的频道发送消息,所有订阅此频道的订阅者都会收到该消息
- 发布者发布消息的命令是PUBLISH, 用法是
PUBLISH channel message
比如向channel.1发一条消息:hello
PUBLISH channel.1 “hello”
这样就实现了消息的发送,该命令的返回值表示接收到这条消息的订阅者数量。因为在执行这条命令的时候还没有订阅者订阅该频道,所以返回为0. 另外值得注意的是消息发送出去不会持久化,如果发送之前没有订阅者,那么后续再有订阅者订阅该频道,之前的消息就收不到了 订阅者订阅消息的命令是:
SUBSCRIBE channel [channel …]
该命令同时可以订阅多个频道,比如订阅channel.1的频道:SUBSCRIBE channel.1,执行SUBSCRIBE命令后客户端会进入订阅状态。 一般情况下,我们不会用pub/sub来做消息发送机制,毕竟有这么多MQ技术在。
五、时间轮机制
在前面分析的Redisson的分布式锁实现中,有一个Watch Dog机制来对锁键进行续约,代码如下:
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//用到了时间轮机制
Timeout task = commandExecutor.getConnectionManager().newTimeout(new
TimerTask() {
//添加一个任务到时间轮
//省略部分代码....
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次间隔租期的1/3时间执行
ee.setTimeout(task);
}
实际上是构建了一个TimerTask,通过 timer.newTimeout(task, delay, unit); 添加到时间轮中。
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
//delay: 延迟执行时间
//unit: 延迟执行时间单位
return timer.newTimeout(task, delay, unit);
} catch (IllegalStateException e) {
if (isShuttingDown()) {
return DUMMY_TIMEOUT;
}
throw e;
}
}
private HashedWheelTimer timer;
5.1 时间轮概念
时间轮这个技术其实出来很久了,在kafka、zookeeper等技术中都有时间轮使用的方式。什么是时间轮呢? 简单来说: 时间轮是一种高效利用线程资源进行批量化调度的一种调度模型。把大批量的调度任务全部绑定到同一个调度器上,使用这一个调度器来进行所有任务的管理、触发、以及运行。所以时间轮的模型能够高效管理各种延时任务、周期任务、通知任务。 以后大家在工作中遇到类似的功能,可以采用时间轮机制。 如下图,时间轮,从图片上来看,就和手表的表圈是一样,所以称为时间轮,是因为它是以时间作为刻度组成的一个环形队列,这个环形队列采用数组来实现,数组的每个元素称为槽,每个槽可以放一个定时任务列表,叫HashedWheelBucket,它是一个双向链表,链表的每一项表示一个定时任务项(HashedWhellTimeout),其中封装了真正的定时任务TimerTask。 时间轮是由多个时间格组成,下图中有8个时间格,每个时间格代表当前时间轮的基本时间跨度(tickDuration),其中时间轮的时间格的个数是固定的。在下图中,有8个时间格(槽),假设每个时间格的单位为1s,那么整个时间轮走完一圈需要8s钟。每秒钟指针会沿着顺时针方向移动一个,这个单位可以设置,比如以秒为单位,可以以一小时为单位,这个单位可以代表时间精度。通过指针移动,来获得每个时间格中的任务列表,然后遍历这一个时间格中的双向链表来执行任务,以此循环。
5.2 时间轮使用
这里使用的时间轮是Netty这个包中提供的,使用方法比较简单。 1、先构建一个HashedWheelTimer时间轮: tickDuration: 100 ,表示每个时间格代表当前时间轮的基本时间跨度,这里是100ms,也就是指针100ms跳动一次,每次跳动一个窗格; ticksPerWheel: 1024,表示时间轮上一共有多少个窗格,分配的窗格多,占用内存空间就越大 leakDetection:是否开启内存泄漏检测。 maxPendingTimeouts[可选参数],最大允许等待的任务数,默认没有限制。 2、通过newTimeout()把需要延迟执行的任务添加到时间轮中
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@RestController
@RequestMapping("/timer")
public class HashedWheelController {
//时间轮的定义
HashedWheelTimer hashedWheelTimer=new HashedWheelTimer(
new DefaultThreadFactory("demo-timer"),
100, TimeUnit.MILLISECONDS,1024,false);
@GetMapping("/{delay}")
public void tick(@PathVariable("delay")Long delay){
//SCHEDULED(定时执行的线程)
//Timer(Java原生定时任务执行)
//订单关单
System.out.println("CurrentTime:"+new Date());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("Begin Execute:"+new Date());
},delay,TimeUnit.SECONDS);
}
}
5.3 时间轮的原理解析
时间轮的整体原理,分为几个部分。
- 创建时间轮
时间轮本质上是一个环状数组,比如我们初始化时间轮时:ticksPerWheel=8,那么意味着这个环状数组的长度是8,如下图所示。
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
- 添加任务,如下图所示
当通过newTimeout()方法添加一个延迟任务时,该任务首先会加入到一个阻塞队列中。然后会有一个定时任务从该队列获取任务,添加到时间轮的指定位置,计算方法如下。
//当前任务的开始执行时间除以每个窗口的时间间隔,得到一个calculated值(表示需要经过多少tick,指针每跳动一个窗格,tick会递增),单位为nanos(微毫秒)
long calculated = timeout.deadline / tickDuration;
//计算当前任务需要在时间轮中经历的圈数,因为当前任务执行时间有可能大于完整一圈的时间,所以需要计算经过几圈之后才能执行该任务。
timeout.remainingRounds = (calculated - tick) / wheel.length;
//取最大的一个tick,有可能当前任务在队列中已经过了执行时间,这种情况下直接用calculated这个值就没意义了。
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
int stopIndex = (int) (ticks & mask); //通过ticks取模mask,得到一个下标
HashedWheelBucket bucket = wheel[stopIndex]; //把任务添加到指定数组下标位置
- 任务执行
Worker线程按照每次间隔时间转动后,得到该时间窗格中的任务链表,然后从链表的head开始逐个取出任务,有两个判断条件 当前任务需要转动的圈数为0,表示任务是当前圈开始执行 当前任务达到了delay时间,也就是 timeout.deadline <= deadline 最终调用timeout.expire()方法执行任务。
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
if (timeout.remainingRounds <= 0) {
next = remove(timeout);
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should
never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)",
timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
timeout.remainingRounds --;
}
timeout = next;
}
}
|