| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> RedLock加锁源码分析 -> 正文阅读 |
|
[大数据]RedLock加锁源码分析 |
目录 Redis获取锁首先了解下Redisson,获取RLock? RedissionLock
加锁操作
首先解析下Lock()方法,该方法不传递参数,主要方法?lockInterruptibly();下面进入该方法查看 public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); }public void lockInterruptibly(long leaseTime, TimeUnit unit) lockInterruptibly接着开始解析lockInterruptibly(long leaseTime, TimeUnit unit)方法
分步讲解: 1:进入方法 Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); }<T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) ?查看一下?tryAcquireAsync 方法 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { //对于 Lock()无参而言 leaseTime =-1 会走下一步 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //拿到获取剩下有效时长的Future RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>() { @Override public void operationComplete(Future<Long> future) throws Exception { if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } ttlRemainingFuture.addListener(new FutureListener<Long>(){}) commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()? 这个是获取看门狗超时时长? private long lockWatchdogTimeout = 30 * 1000; 默认时长3秒 接着看? <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //时间转换为毫秒 这里T是Long类型 要get()的结果就是锁的剩余有效时长单位毫秒 internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } 主要就是去执行下面这段luna脚本的,首选判断redis key是否存在,key就是getName(),也就是你使用锁的时候加锁的名称redission.getLock(lockName),第一个线程加锁肯定是不存在的可以继续操作,也就是set命令执行写操作 value就是getLockName(threadId)? ,然后设置有效时长
如果再往下面看就是执行 下面的逻辑的,这些暂不介绍 public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) { NodeSource source = getNodeSource(key); return evalAsync(source, false, codec, evalCommandType, script, keys, params); } ?真正执行的这段luna脚本?eval script args? ?会在redis里写入key USER_LOCK_KEY_1 value??46b48db8-9759-41c5-9108-ab1f41ee17a0:73? pexpire??30000 ?回到tryAcquireAsync方法,将key value写进redis后会返回一个RFuture,这东西继承了Future public interface RFuture<V> extends java.util.concurrent.Future<V>, CompletionStage<V> 然后添加线程监听器,监听锁的有效时间是否过时或者锁是否已经被释放 接着看下面这个方法 private void scheduleExpirationRenewal(final long threadId) 根据方法名就可以知道,该方法就是重新设置过期时间的,会根据当前的线程Id去获取线程加的锁,去查看锁的有效时间,根据定时任务去每次间隔三分之一的internalLockLeaseTim时间执行一次 ?再回到方法 <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)? 可以知道获取到ttlRemainingFuture后就会开启监听,启动线程一直去定时刷新锁的剩余时间,直到锁被释放 接着 看lockInterruptibly()方法 RFuture<RedissonLockEntry> future = subscribe(threadId); commandExecutor.syncSubscription(future); ?subscriber是redis的消息订阅模式,会执行PublishSubscribe类的方法 public RFuture<E> subscribe(final String entryName, final String channelName, final PublishSubscribeService subscribeService) { final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>(); final AsyncSemaphore semaphore = subscribeService.getSemaphore(channelName); final RPromise<E> newPromise = new RedissonPromise<E>() { @Override public boolean cancel(boolean mayInterruptIfRunning) { return semaphore.remove(listenerHolder.get()); } }; Runnable listener = new Runnable() { @Override public void run() { E entry = entries.get(entryName); if (entry != null) { entry.aquire(); semaphore.release(); entry.getPromise().addListener(new TransferListener<E>(newPromise)); return; } E value = createEntry(newPromise); value.aquire(); E oldValue = entries.putIfAbsent(entryName, value); if (oldValue != null) { oldValue.aquire(); semaphore.release(); oldValue.getPromise().addListener(new TransferListener<E>(newPromise)); return; } RedisPubSubListener<Object> listener = createListener(channelName, value); subscribeService.subscribe(LongCodec.INSTANCE, channelName, semaphore, listener); } }; semaphore.acquire(listener); listenerHolder.set(listener); return newPromise; } 后面就是简单的一个循环,在while循环里面调用tryAcquire 方法 更新锁的有效时间,最后取消订阅 while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { break; } // waiting for message if (ttl >= 0) { getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { getEntry(threadId).getLatch().acquire(); } } finally { unsubscribe(future, threadId); } ? unLockredission.unLock() public void unlock() { Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId())); if (opStatus == null) { throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } if (opStatus) { cancelExpirationRenewal(); } // Future<Void> future = unlockAsync(); // future.awaitUninterruptibly(); // if (future.isSuccess()) { // return; // } // if (future.cause() instanceof IllegalMonitorStateException) { // throw (IllegalMonitorStateException)future.cause(); // } // throw commandExecutor.convertException(future); } 执行redis del 语句 protected RFuture<Boolean> unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId)); } 取消定时任务续命锁的有效时间 void cancelExpirationRenewal() { Timeout task = expirationRenewalMap.remove(getEntryName()); if (task != null) { task.cancel(); } } 记住上面在执行 tryAcquireAsync方法的时候,对于leaseTime是否传入值,会走不同的方法,再看下 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.addListener(new FutureListener<Long>(){ ......????????if (ttlRemaining == null) { ???????? scheduleExpirationRenewal(threadId); ????????} ?????} ? } return ttlRemainingFuture; } 如果你使用的是这个方法? lock.lock(10,TimeUnit.SECONDS);是不会启动看门狗后台任务刷新锁的有效时间的为初始值30秒的 下面的两个方法可以获取看门狗监听任务刷新锁有效时间 lock.lockInterruptibly(); lock.tryLock(10,TimeUnit.SECONDS); 表示多线程下不同线程去强占锁的时候会等待10秒,如果10秒钟没有或得锁,就获取不到,需要等待,如果10秒内能够获取到锁就直接执行,不同于Lock(),这个没有等待时间 |
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/16 6:46:35- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |