IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 如何用 Redis 实现一个分布式锁 -> 正文阅读

[大数据]如何用 Redis 实现一个分布式锁

场景模拟

一般电子商务网站都会遇到如团购、秒杀、特价之类的活动,而这样的活动有一个共同的特点就是访问量激增、上千甚至上万人抢购一个商品。然而,作为活动商品,库存肯定是很有限的,如何控制库存不让出现超买,以防止造成不必要的损失是众多电子商务网站程序员头疼的问题,这同时也是最基本的问题。

在秒杀系统设计中,超卖是一个经典、常见的问题,任何商品都会有数量上限,如何避免成功下订单买到商品的人数不超过商品数量的上限,这是每个抢购活动都要面临的难点。

针对大量的并发请求,我们可以通过 Redis 来抗,也就是说对于库存直接请求 Redis 缓存,不直接请求数据库,如在 Redis 中有 50 个库存,如下:

image-20220309110319351

但不管是缓存还是数据库,在不做任何处理的情况下,都会出现超买的问题,常见的处理方式就是在代码中通过JVM 加锁的方式,如下:

server1

@RestController
public class SkillController {

    @Autowired
    private RedisTemplate redisTemplate;

    // 秒杀接口
    @RequestMapping("/deduct_stock")
    public String deductStock() {

        // 加锁
        synchronized (this) {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
            if (stock > 0) {
                // 库存 -1
                int realStock = stock - 1;
                // 扣减库存
                redisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }

        return "8080";
    }
}

当然,在单机情况下确实没有任何问题,但现在绝大多数系统都是分布式系统,就算是 ERP 系统也会部署 2 台机器防止单点故障,所以一般情况下一个请求如下:

image-20220309110832380

server2

server2 和 server1 代码基本相同,只是开启了 2 个 JVM 实例。

@RestController
public class SkillController {

    @Autowired
    private RedisTemplate redisTemplate;

    // 秒杀接口
    @RequestMapping("/deduct_stock")
    public String deductStock() {

        // 加锁
        synchronized (this) {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
            if (stock > 0) {
                // 库存 -1
                int realStock = stock - 1;
                // 扣减库存
                redisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        }

        return "8090";
    }
}

Nginx

一般来说,前端通过 nginx 请求转发并通过 upstream 实现负载均衡,其关键配置如下:

image-20220309111849631

Jmeter

我们这里通过 Jmeter 来进行并发压测,不会用的参考 Jmeter 使用,然后这里提供下载链接:Jmeter 下载 (提取码:2hyo)。

并发请求:1 s 内 200 个请求(模拟高并发),循环 5 次,一共 1000 个总请求。

image-20220309113854795

请求地址:秒杀的减库存接口。

image-20220309113953340

JVM 锁

了解了上面的配置,然后启动 2 个实例,端口分别为 8080,8090,如下:

image-20220309112256004

如过不知道如何启动 2 个实例的看下面:

image-20220309112521469

注意:要修改启动端口。

使用 JVM 锁也就是同步代码块的方式存在问题,如上面测试的结果如下:

image-20220309135924299

不仅 2 个服务同时存在相同的库存,甚至同一个服务也存在相同的值,很明显在高并发分布式场景下,JVM 层面的锁是不可行的。

Redis SETNX

SETNX

格式:setnx key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

1、实现一个最简单的分布式锁

@RestController
public class SkillController {

    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() {

        // 商品 ID,具体应用中应该是请求传入的
        String lockKey = "lock:product_01";
        // SETNX 加锁
        Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product");
        
        // 如果为 false 说明这把锁存在,直接返回
        if (!result) {
            // 模拟返回业务
            return "系统繁忙";
        }
        int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
        if (stock > 0) {
            // 库存 -1
            int realStock = stock - 1;
            // 扣减库存 模拟其他更多业务操作
            redisTemplate.opsForValue().set("stock", realStock + "");
            System.out.println("扣减成功,剩余库存:" + realStock);
        } else {
            System.out.println("扣减失败,库存不足");
        }

        // 加锁后需要释放锁
        redisTemplate.delete(lockKey);
        
        return "8080";
    }
}

2、存在的问题

① 业务代码异常—死锁

在实际的场景中,一次秒杀过程涉及到很多的业务操作,如果在释放锁之前的某个业务操作抛异常,使得锁没有被释放,那么此时就会存在死锁问题。此时这个 key 永远存在于 redis 中,其它线程执行 SETNX 永远失败。

也就是说我们要保证释放锁得到执行,所以要把上面的业务代码放在 try catch 或者 try finally 中:

image-20220309151042583

② Redis 宕机

但其实上面的代码并不一定能完全解决问题,如果 Redis 宕机或者被重启,同样会导致 finally 中的代码执行失败,结果就同上了。

所以一般我们需要给这个 key 设置过期时间,即:

Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product");

redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

但上面的写法存在原子性问题,所以我们不能分开来写,得合成一条命令:

Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, "product", 10, TimeUnit.SECONDS);

③ 高并发可能存在的问题

一般来说,并发量不大的情况下,上面的写法已经满足要求,但对于几千上万的并发,可能会导致接口的响应变慢,比如:

某个请求 A 执行完整个操作需要 15s ,而我们上面设置的超时时间为 10s ,所以此时请求 A 并没有执行完,但由于设置了过期时间把 key 给删掉了,然后这时再来一个请求 B,是可以加锁成功的,且在 8s 内执行完成,而 B 在执行过程中 A 同样也在执行,如果此时 A 可能先于 B 去执行 finally 中的代码把锁给删除了,但 A 删除的锁并不是它的,而是 B 加的锁,同理,当请求 C 加锁后又被请求 B 给释放了,也就是说,这把分布式锁直接无效了(尽管可能性很小),同样会出现超买问题。

这个问题的根本在于:自己加的锁被被别人释放了

因此我们可以确定 value 值唯一性,如 UUID,如下:

image-20220309155219774

但这种方式同样存在原子问题,也就是上图中 ② 处的代码,结果也会导致自己加的锁被被别人释放

Redisson

针对上面的问题,我们可以通过 Redisson 来解决,其使用非常简单,和 JDK 中的 Lock 使用类似,如下:

@RestController
public class RedissonController {

    @Autowired
    private Redisson redisson;

    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/deduct_stock1")
    public String deductStock() {

        // 商品 ID,具体应用中应该是请求传入的
        String lockKey = "lock:product_01";

        // 获取锁
        RLock lock = redisson.getLock(lockKey);
        // 加锁
        lock.lock();

        try {
            int stock = Integer.parseInt(redisTemplate.opsForValue().get("stock").toString());
            if (stock > 0) {
                // 库存 -1
                int realStock = stock - 1;
                // 扣减库存
                redisTemplate.opsForValue().set("stock", realStock + "");
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            // 释放锁
            lock.unlock();
        }

        return "8080";
    }
}

再去测试就是正常的了:

image-20220310091303423

Redisson 其原理如下:

image-20220309165722140

Redisson 锁的加锁机制如上图所示,线程去获取锁,获取成功则执行保存数据到 redis 数据库。如果获取失败,则一直通过 while 循环尝试获取锁(可自定义等待时间,超时后返回失败),获取成功后,保存数据到 redis 数据库。

Redisson 提供的分布式锁是支持锁自动续期的(锁续命),也就是说,如果线程仍旧没有执行完,那么 Redisson 会自动给 redis 中的目标 key 延长超时时间,这在 Redisson 中称之为 Watch Dog(看门狗)机制。

那么 redisson 是怎么实现原子性的

当然是 lua。不管是加锁操作,还是看门狗机制都是通过 lua来保证其原子性。

其加锁调用链路如下:

RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->tryLockInnerAsync()

关键代码就在 tryLockInnerAsync() 中:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    		  // 如果锁不存在,则通过hset设置它的值,并设置过期时间
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果锁已存在,其是当前线程,则通过hincrby给数值递增1,即锁的重入
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              // 如果锁已存在,不是当前线程,则返回过期时间 ttl
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

那么对于锁自动续期如下:

RedissonLock.lock()--->lockInterruptibly()--->tryAcquire()--->scheduleExpirationRenewal()

scheduleExpirationRenewal() 方法会开启一个子线程去执行自动延期的操作,当然也是执行 lua代码,如下,截取关键部分:

// getName()就是当前锁的名字 
RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        // 判断这个锁 getName() 是否在redis中存在,如果存在就进行 pexpire 延期 默认30s
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
        "end; " +
        "return 0;",
          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

他会判断这个锁 getName() 是否在 redis 中存在,如果存在就进行 pexpire 延期,默认lockWatchdogTimeout=30s,且是每间隔lockWatchdogTimeout/3=10s时间,去执行延时操作。

源码:https://gitee.com/javatv/redis.git

参考:redisson 中的看门狗机制

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 22:37:16  更:2022-03-15 22:37:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 18:45:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码