IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> redis实现分布式锁 -> 正文阅读

[大数据]redis实现分布式锁

一、什么是分布式锁

1、分布式锁的概念

分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。

  • 多进程可见:多个JVM都能看得到。比如redis、mysql等,那么所有的JVM进程都能看得到
  • 互斥锁:只允许一个进程能拿到锁
  • 高可用:大多数情况下,获取锁都能获取成功
  • 高性能:本身加锁后,线程变成了串行执行,从而会影响性能。所以获取锁的步骤上应该高性能
  • 安全性:获取锁应该考虑异常的情况。获取锁后宕机怎么办? 死锁怎么办

2、分布式锁常见三种方式

分布式锁的核心是实现多进程之间互斥,而满足条件的并且常见的有三种:mysql、redis、zookeeper

  • mysq: 数据库都具有事务的机制,在执行事务操作时,mysql会自动分配一个互斥锁。所以说在事务之间是互斥的,只有一个人能去执行。在业务执行前,去数据库中申请一个互斥锁,然后再去执行业务,当业务执行结束后,提交事务,锁也就释放了。当业务抛出了异常时,会自动触发回滚,锁也就释放了
  • redis: 利用setnx互斥命令。 获取锁就在reids中setnx一条数据,如果没有该key,那么添加成功,随之获取锁成功,反之一样。当删除该key,就是释放锁成功。

在这里插入图片描述

二、基于redis实现分布式锁

1、Setnx的介绍

实现分布式锁时需要实现的两个基本方法:

获取锁:

  • 互斥:确保只能有一个线程获取锁

127.0.0.1:6379[1]> setnx lock thread1
(integer) 1 # 现在成功获取锁了
127.0.0.1:6379[1]> setnx lock thread1
(integer) 0 # 重复获取锁,发现获取失败

释放锁:

  • 手动释放:删除key

127.0.0.1:6379[1]> del lock
(integer) 1 # 删除key,就是手动释放锁
127.0.0.1:6379[1]> KEYS *
(empty array) # 发现key,已经没有了

  • 超时释放:在获取锁时加入过期时间。 可以避免服务宕机,然后死锁

127.0.0.1:6379[1]> set lock thread ex 8 nx
OK
127.0.0.1:6379[1]> ttl lock
(integer) 6
127.0.0.1:6379[1]> ttl lock
(integer) 4
127.0.0.1:6379[1]> ttl lock
(integer) 1
127.0.0.1:6379[1]> ttl lock
(integer) -2

2、 实现redis分布式锁

  1. 添加释放锁需要判断是否是当前线程,避免锁误删操作。
  2. 添加LUA脚本解决多条命令原子性问题

1.定义接口,利用redis实现分布式锁功能

尝试获取锁:是因为采用的是非阻塞式。获取锁只是获取一次。要么成功要么失败。

public interface ILock {

    /**
     * 尝试获取锁
     * @param timeoutSec = EX :锁持有的超时时间,过期后自动释放
     * @return true代表获取锁成功; false代表获取锁失败
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unlock();
}

2. 实现接口,具体实现获取锁和释放锁

1.在获取锁时存入线程标识(可以用UUID表示)

2.在释放锁时先获取锁中的线程标识,判断是否与当前线程标识一致。从而避免误删别人的锁

  • 如果一致则释放锁
  • 如果不一致则不释放锁
public class SimpleRedisLock implements ILock {

    private String name;
    private StringRedisTemplate stringRedisTemplate;

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
        this.name = name;
        this.stringRedisTemplate = stringRedisTemplate;
    }

    private static final String KEY_PREFIX = "lock:";
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

    @Override
    public boolean tryLock(long timeoutSec) {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁
        Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
        // 调用lua脚本
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX + Thread.currentThread().getId());
    }
    
    /*@Override
    public void unlock() {
        // 获取线程标示
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁中的标示
        String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
        // 判断标示是否一致
        if(threadId.equals(id)) {
            // 释放锁
            stringRedisTemplate.delete(KEY_PREFIX + name);
        }
    }*/
}

3.释放锁的lua脚本

-- 比较线程标示与锁中的标示是否一致
if(redis.call('get', KEYS[1]) ==  ARGV[1]) then
    -- 释放锁 del key
    return redis.call('del', KEYS[1])
end
return 0

三、基于Redisson实现分布式锁

1、Redisson的介绍

Redisson是开源的框架,在redis基础上实现的分布式工具的集合。而分布式锁只是Redisson的一个子集。

  • 每个Redis服务实例都能管理多达1TB的内存。

  • Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本

  • GitHub地址:https://github.com/redisson/redisson

  • 下面这个图是来自官网,可以实现对锁的功能
    在这里插入图片描述

四、SpringBoot集成Redisson

1、配置环境

  1. 引入依赖
<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.13.6</version>
</dependency>
  1. 配置Redisson客户端:
@Configuration
public class RedissonConfig {
    // redis的工厂类,可以从中拿到各种工具
    @Bean
    public RedissonClient redissonClient(){
        // 配置类
        Config config = new Config();
// 添加redis地址,这里添加了单点的地址,也可以使用config.useClusterServers()添加集群地址
        config.useSingleServer().setAddress("redis://192.168.75.111:6379").setPassword("123321");
        // 创建RedissonClient对象。创建客户端
        return Redisson.create(config);
    }
}

为什么不使用yml文件和start呢?
添加配置可以使用yml文件,跟springBoot整合来实现,官网还提供了start。
因为会替代spring提供的redis的配置和实现。
建议使用Redisson时,自己进行配置bean,不和spring提供的redis配置进行掺和。

2、测试


    @Resource
    private RedissonClient redissonClient;
    @Test
    void testRedisson() throws InterruptedException {
        // 获取锁(可重入),指定锁的名称
        RLock lock = redissonClient.getLock("anyLock");
     
      /* 尝试获取锁,参数分别是:
        	参数一:获取锁的最大等待时间(期间会重试)
        	参数二:锁自动释放时间,时间单位
		1. 无参模式:非阻塞式
			等待时间为-1,就是不等待。如果获取失败立即结束。
			自动释放为30秒钟,超时30秒后才会释放
		
		*/
        boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
        // 判断释放获取成功
        if(isLock){
            try {
                System.out.println("执行业务");
            }finally {
                // 释放锁
                lock.unlock();
            }
        }
    }

3、可重入锁的原理

1、什么是可重入锁?

可重入锁

  • 指的是同一个线程,可以多次获得一把锁。
  • 利用Hash结构记录线程id和重试次数。
  • 利用watchDog延续锁时间。
  • 利用信号量控制锁重试等待。

缺点:redis宕机引起锁失效问题

例如方法A调用方法B,在方法A中先去获得锁,然后执行业务去调用B,而B又要获取同一把锁。
而例如set key value nx time 就是不可重入锁,就会出现死锁的状态。例如:如果A获得锁后,去执行B,B如果也想获得锁,但是A并没有释放锁,所以说就会出现死锁状态。

2、获取锁和释放锁

需要Hash类型

  • key中记录锁的名称
  • field记录线程标识
  • value记录锁的重试次数。

获取锁和释放锁的流程:

  1. 创建锁的对象
  2. 在方法A中,获取锁,tryLock时记录锁的线程标识和重试次数为1
  3. 在方法B中,获取锁。如果是锁已经存在,并且是同一线程时,只需要在重试次数中加1。代表是第二次获取同一个锁。
  4. 在方法B或者方法A中,执行完业务,释放锁的逻辑是:需要把重试次数减1,并判断是否为0,如果为0则删除锁。
@SpringBootTest
class RedissonTest {

    @Resource
    private RedissonClient redissonClient;
    private RLock lock;
    @BeforeEach
    void setUp() {
        lock = redissonClient.getLock("order");
    }
    @Test
    void method1() throws InterruptedException {
        // 尝试获取锁
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("获取锁失败 .... 1");
            return;
        }
        try {
            log.info("获取锁成功 .... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 .... 1");
            lock.unlock();
        }
    }
    void method2() {
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 .... 2");
            return;
        }
        try {
            log.info("获取锁成功 .... 2");
            log.info("开始执行业务 ... 2");
        } finally {
            log.warn("准备释放锁 .... 2");
            lock.unlock();
        }
    }
}

4. 获取锁和释放锁的lua脚本

获取锁和释放锁一定要采用Lua脚本,来确保获取和释放锁的原子性

获取锁:

 local key = KEYS[1]; -- 锁的key
        local threadId = ARGV[1]; -- 线程唯一标识
        local releaseTime = ARGV[2]; -- 锁的自动释放时间
        -- 判断是否存在
        if(redis.call('exists', key) == 0) then
        -- 不存在, 获取锁
        redis.call('hset', key, threadId, '1');
        -- 设置有效期
        redis.call('expire', key, releaseTime);
        return 1; -- 返回结果
        end;
        -- 锁已经存在,判断threadId是否是自己
        if(redis.call('hexists', key, threadId) == 1) then
        -- 不存在, 获取锁,重入次数+1
        redis.call('hincrby', key, threadId, '1');
        -- 设置有效期
        redis.call('expire', key, releaseTime);
        return 1; -- 返回结果
        end;
        return 0; -- 代码走到这里,说明获取锁的不是自己,获取锁失败

释放锁:

        local key = KEYS[1]; -- 锁的key
        local threadId = ARGV[1]; -- 线程唯一标识
        local releaseTime = ARGV[2]; -- 锁的自动释放时间
        -- 判断当前锁是否还是被自己持有
        if (redis.call('HEXISTS', key, threadId) == 0) then
        return nil; -- 如果已经不是自己,则直接返回
        end;
        -- 是自己的锁,则重入次数-1
        local count = redis.call('HINCRBY', key, threadId, -1);
        -- 判断是否重入次数是否已经为0
        if (count > 0) then
        -- 大于0说明不能释放锁,重置有效期然后返回
        redis.call('EXPIRE', key, releaseTime);
        return nil;
        else -- 等于0说明可以释放锁,直接删除
        redis.call('DEL', key);
        return nil;
        end;

4、 Redisson的锁重试和WatchDog机制

1、什么可重试问题

可重试:利用信号量和PubSub【发布订阅】功能实现等待、唤醒、获取锁失败的重试机制。

第一次尝试获取锁失败以后,并不是立即失败,而是利用了redis的PubSub的机制,做一个等待,等待释放锁的消息。
而获取锁成功的线程,在释放锁中会发送一条释放锁的消息。从而会被正在等待的线程通过订阅机制捕获到。
当等到释放锁的消息后,就会重试机制。

不可重试: 获取锁只尝试一次就返回false。

boolean isLock = lock.tryLock();

tryLock()的参数:
long waitTime:获取锁的最大等待时常。当第一次获取锁失败后,不会立即返回false,而是在规定的时间内进行重试,直到超时才会返回falselong leaseTime:自动失效释放的时间
TimeUnit unit:时间单位

2、深入原码解释:

在这里插入图片描述

从获取锁这条命令开始往下执行:
boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime); //将传进来的时间,转换为毫秒
        long current = System.currentTimeMillis();//得到当前时间
        long threadId = Thread.currentThread().getId(); //得到线程的id,就是将来锁的标识
        Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId); //尝试获取锁。
        if (ttl == null) {
            return true; //如果获取成功,返回true
        } else {
        	/*
        	 *获取失败,就要再次获取:
        	*/
            time -= System.currentTimeMillis() - current; //判断是否超时重试时间
            if (time <= 0L) {
                this.acquireFailed(waitTime, unit, threadId);
                return false;
            } else {
                current = System.currentTimeMillis(); // 再次得到当前时间
                /*
                并没有立即去尝试。而是订阅subscribe 其他人释放锁的信号
				在释放锁时有这样的语句,用来发布信号:redis.call('piblis',KEY[2],ARGV[1]);" 
				*/
                RFuture<RedissonLockEntry> subscribeFuture = this.subscribe(threadId);
                // 当且仅当future在指定的时间限制内完成时为True
                // 等待time(锁的剩余等待时间),如果等到锁的时间过期,
                // 还没有等到释放锁的信号,就会返回获取锁失败
                if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
                    if (!subscribeFuture.cancel(false)) {
                        subscribeFuture.onComplete((res, e) -> {
                            if (e == null) {
                            	// 等待锁的重试超时时间,就取消订阅
                                this.unsubscribe(subscribeFuture, threadId);
                            }

                        });
                    }
                    this.acquireFailed(waitTime, unit, threadId);
                    return false;
			 try {		//计算剩余等待时间
                        time -= System.currentTimeMillis() - current;
                    	// 如果剩余等待时间小于0
                        if (time <= 0L) {
                            this.acquireFailed(waitTime, unit, threadId);
                            boolean var20 = false;
                            return var20;
                        } else {
                            boolean var16;
                            // 如果剩余等待时间大于0 。 进入do while循环
                            do {
                            	//得到当前时间
                                long currentTime = System.currentTimeMillis();
                                // 第一次去重试
                                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);
                                if (ttl == null) {
                                    var16 = true;
                                    return var16;
                                }
								// 如果获取失败,则看一下剩余时间
                                time -= System.currentTimeMillis() - currentTime;
                                if (time <= 0L) {
                                    this.acquireFailed(waitTime, unit, threadId);
                                    var16 = false;
                                    return var16;
                                }
								// 剩余时间如果还有
                                currentTime = System.currentTimeMillis();
                                // 采用信号量。在规定时间内,等待得到释放锁的信号量
                                // 如果ttl小于等待时间:说明在等待时锁就释放了,就等待ttl的时间
                                // 如果ttl大于等待时间:等待time的时间
                                if (ttl >= 0L && ttl < time) {
          
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                                } else {
                                    ((RedissonLockEntry)subscribeFuture.getNow()).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                                }
								
                                time -= System.currentTimeMillis() - currentTime;
                                // 如果等着ttl 到期后,time肯定还没有到期。那么就一直循环while,等待锁的释放信号
                            } while(time > 0L);
							
                            this.acquireFailed(waitTime, unit, threadId);
                            var16 = false;
                            return var16;
                        }
                    } finally { 
                        this.unsubscribe(subscribeFuture, threadId);
                    }
                }
            }
        }
    }

2、WatchDog机制

超时释放: 锁超时释放虽然可以避免死锁,但如果业务执行耗时较长,也会导致锁释放,存在安全隐患。

超时续约: 利用watchDog看门狗机制,每隔一段时间(releseTime/3),重置超时时间

  private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
        /*
        getConnectionManager:看门狗的时间,默认是30秒,去获取锁		
		*/
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            // 当future完成以后(剩余有效期,和异常)
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                	// 剩余有效期= null, 说明获取锁成功了
                    if (ttlRemaining == null) {
                    	// 任务调度  过期时间  : 自动续期功能 
                        this.scheduleExpirationRenewal(threadId);
                    }

                }
            });
            return ttlRemainingFuture;
        }
    }


this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);

五、分布式锁主从一致性问题

1、什么是主从一致性问题

如果redis提供了主从集群,主从同步存在延迟,挡住宕机后,如果从并同步中的锁数据,则会出现锁实现。

主节点:负责增删改
从节点:只负责读的问题
那么主节点会把数据同步到从节点中,但是同步时会存在延迟,即使延迟很短也是会存在。当获取锁后,主从数据还没有来及同步时,主节点宕机了。主备切换后,在新的master节点中,发现锁并不存在了。

2、Redisson如何解决一致性问题【MultiLock联锁】

原理:多个独立的redis节点,必须在所有节点都获取重入锁,才算获取锁成功
优点:所有锁中最安全的实现方法
缺点:运维成本高、实现复杂

既然主从关系是导致一致性问题的原因,那么Redisson取消主从,那么所有的节点都是独立的redisson节点,相互之间没有任何关系,都可以做读写操作。那么获取锁时,依次在多个节点中进行获取锁操作。
可用性问题: 即使某一个节点宕机后,那么其他节点都有锁的信息。
更高的可用性: 在每一个节点后面加入slave节点,做主从同步。

即使加入了主从同步,也不会出现安全问题。
假设某一台master宕机后,刚好并没有完成数据同步。那么slave变成了master主节点。没有锁标识。
有一个线程趁虚而入,想要获取锁,并不能获取成功。因为只有在每一个节点都拿到锁才能获取成功。
只要任意一个节点存活中,其他线程就不能拿到锁,就不会出现锁失效的问题。

优点:保留了主从 机制,确保了整个redis的高可用特性,避免了主从一致引发的锁失效问题。

在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-25 18:11:43  更:2022-06-25 18:15:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:44:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码