IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis实现分布式锁---业务真实使用 -> 正文阅读

[大数据]Redis实现分布式锁---业务真实使用

分布式场景问题

商品超卖问题

在这里插入图片描述

这个问题是做项目时候学习到的,例如下单流程:同一个节点中的并发访问安全可以使用synchnized来保证线程的安全,但是不同节点在同一个业务这是无法保证安全的。

怎么解决分布式并发问题

使用redis实现分布式的锁

将上面的步骤进行一个拆分:

在这里插入图片描述

总的来说就是在做这些操作之前一定需要对每一个商品进行上锁,这样其他节点就无法操作。

使用Redis实现

这里使用项目中的真实案例来实现这个过程。一个下单流程代码如下:

/**
* 1.根据购物车选中的cid来查询购物车记录信息
* 2.校验库存
* 3.生成订单信息
* 4.生成快照信息
* 5.扣减库存
* 6.删除购物车中记录(已生成订单的就可以直接删除)
*/
public Map<String,String> addOrder(String cids, Orders order) {
    Map<String,String> map = new HashMap<>();

    //1.根据购物车选中的cid来查询商品信息
    String[] split = cids.split(",");
    List<Integer> list = new ArrayList<>();
    for (String s : split) {
        list.add(Integer.parseInt(s));
    }
    List<ShoppingCartVO> shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);

    /**
     * 将所有的skuid写入redis ,有一个不满足就不能进行上锁
     */
    boolean isLock = true;
    String skuIds[] = new String[shoppingCartVOS.size()];
    for (int i = 0; i < shoppingCartVOS.size(); i++) {
        String skuId = shoppingCartVOS.get(i).getSkuId();
        //进行上锁
        Boolean ifAbsent = redisTemplate.boundValueOps(skuId).setIfAbsent("sheep");
        if (ifAbsent){
            skuIds[i] = skuId;
        }
        isLock = isLock && ifAbsent;
    }
    //如果每个商品都没有被其他人操作,既可以进行操作,否则就要释放每一个上过的Lock
    if(isLock){
        try {
            //2.校验库存 --> shoppingCartVOS 中的cartNum < stock即可
            //这里需要再次查询是第一次查询是没有加锁,如果查完被别人先加锁修改了,那么这里的库存就亏不对,所以需要再次查询
            shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
            boolean stockFlag = true;
            for (ShoppingCartVO cartVO : shoppingCartVOS) {
                if(Integer.parseInt(cartVO.getCartNum()) > cartVO.getSkuStock()){
                    stockFlag = false;
                    break;
                } 
            }
			//其他业务细节这里屏蔽,只体现上锁和解锁过程
            if(stockFlag){
                //3. 保存订单:
                //4.生成快照信息OrderItem
                //扣减库存
                //删除购物车记录
                map.put("orderId",orderId);
                map.put("productNames",untitled);
                //之后返回信息
                return map;
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //释放锁
            for (int i = 0; i < skuIds.length; i++) {
                String skuId = skuIds[i];
                if (skuId != null && !"".equals(skuId)){
                    redisTemplate.delete(skuId);
                }
            }
        }
    } else {
        //释放锁
        for (int i = 0; i < skuIds.length; i++) {
            String skuId = skuIds[i];
            if (skuId != null && !"".equals(skuId)){
                redisTemplate.delete(skuId);
            }
        }
    }

    return null;
}

这里依然还有几个问题:

  • 如果订单中部分商品加锁成功,但是某一个加锁失败就需要全部释放
  • 如果加锁之后,出现了异常导致这个线程销毁,锁谁来释放?

解决锁无法释放问题

这里根据上面的描述不难理解其实可以通过Redis中设置超时时间来解决,这样到点必定会释放锁资源。

但是这样又出现了另一个问题,假如有线程 t1 和 t2 ,t1加锁设置超时时间,然后由于一些特殊原因运行比较慢,没有处理完业务锁过期了,此时 t2 进行加锁,但是 t1 处理完后会释放锁,这样会把 t2 的给释放掉,那么其他线程又可以进行操作造成并发问题。如图:

在这里插入图片描述

解决t1释放t2锁问题

根据上面问题,其实是可以联想到,释放锁的时候去检查这个锁是不是我自己锁住的。上面的代码中不难发现,加锁的时候value是固定的,可以从这里入手,使用一个map集合将这个加锁的商品保存起来,key为加锁的商品id,value是随机生成的字符串(尽可能保证唯一即可)。释放的时候就可以从redis获取,如果有值并且值就是我自己生成的那个字符串才进行释放。

代码根据上面的业务代码进行改造,可以这样设计:

//加锁改造如下:
boolean isLock = true;
String skuIds[] = new String[shoppingCartVOS.size()];
Map<String,String> values = new HashMap<>();
for (int i = 0; i < shoppingCartVOS.size(); i++) {
    String skuId = shoppingCartVOS.get(i).getSkuId();
    
    //这是不再固定
    String uuid = UUID.randomUUID().toString();
    //进行上锁
    Boolean ifAbsent = redisTemplate.boundValueOps(skuId).setIfAbsent(uuid,10, TimeUnit.SECONDS);
    if (ifAbsent){
        skuIds[i] = skuId;
        values.put(skuId,uuid);
    }
    isLock = isLock && ifAbsent;
}
//如果每个商品都没有被其他人操作,既可以进行操作,否则就要释放每一个上过的Lock
if(isLock){}

//释放锁改造
for (int i = 0; i < skuIds.length; i++) {
    String skuId = skuIds[i];
    if (skuId != null && !"".equals(skuId)){
        
        //从redis中获取值比较与加锁时候是否一直
        String value = redisTemplate.boundValueOps(skuId).get();
        if (value != null && map.get(skuId).equals(value)){
            redisTemplate.delete(skuId);
        }
    }
}

这里,又有一个问题,这一切看起来好像都完美,但是,释放锁这里依然会有并发问题,比如:

t1 在释放锁的时候if判断完成,正要执行删除锁的代码,结果锁过期了,然后别人拿到这个锁,结果 t1 还是可以释放掉别人的锁,这就是一个问题,这里就需要保证判断和释放应该同时操作,也就是这两个操作要具有原子性

使用lua脚本解决

首先在resource目录下创建脚本文件:unlock.lua,这个lua脚本的意思就是从redis中查询传过来的keys[1]内容的并且跟传过来的ARGV[1]是否相等,如下:

if redis.call("get",KEYS[1]) == ARGV[1] then
 return redis.call("del",KEYS[1])
else
 return 0
end

配置Bean加载lua脚本:

@Bean
public DefaultRedisScript<List> defaultRedisScript(){
    DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>();
    defaultRedisScript.setResultType(List.class);
    defaultRedisScript.setScriptSource(new ResourceScriptSource(new                                                         ClassPathResource("unlock.lua")));
    return defaultRedisScript; 
}

通过执?lua脚本解锁:

//执?lua脚本
List<String> keys = new ArrayList<>();
keys.add(skuId);
List rs = stringRedisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
System.out.println(rs.get(0));

业务代码释放锁测操作可以改为:(keys中包含skuId这个脚本会查询这个的value,后面的values.get(skuId)就是需要比较的值)

//释放锁
for (int i = 0; i < skuIds.length; i++) {
    String skuId = skuIds[i];
    if (skuId != null && !"".equals(skuId)){
        List<String> keys = new ArrayList<>();
        keys.add(skuId);
        List rs = redisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
        System.out.println(rs.get(0));
    }
}

分布式锁框架-Redisson

看门狗机制

线程:?于给当前key延?过期时间,保证业务线程正常执?的过程中,锁不会过期。

在这里插入图片描述

说白了就是一个守护线程。

Redission介绍

Redisson在基于NIO的Netty框架上,充分的利?了Redis键值数据库提供的?系列优势,在Java实??具包中常?接?的基础上,为使?者提供了?系列具有分布式特性的常??具类。使得原本作为协调单机多线程并发程序的?具包获得了协调分布式多机多线程并发系统的能?,??降低了设计和研发?规模分布式系统的难度。同时结合各富特?的分布式服务,更进?步简化了分布式环境中程序相互之间的协作。

使用SpringBoot集成

首先引入依赖:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>

然后配置如下:

redisson:
   addr:
     singleAddr:
       host: redis://47.96.11.185:6370
       password: 12345678
       database: 0

配置RedissionClient:

@Configuration
public class RedissonConfig {
    @Value("${redisson.addr.singleAddr.host}")
    private String host;
    @Value("${redisson.addr.singleAddr.password}")
    private String password;
    @Value("${redisson.addr.singleAddr.database}")
    private int database;
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config(); 
        config.useSingleServer().
               setAddress(host).
               setPassword(password).
               setDatabase(database);
        return Redisson.create(config);
    }
}

其他使用配置

单机连接配置参考上面即可

集群连接配置如下:

redisson:
   addr:
     cluster:
       host: redis://47.96.11.185:6370, ... ,redis://47.96.11.185:6373
       password: 12345678

客户端配置:

@Bean
public RedissonClient redissonClient(){
    Config config = new Config();
    config.useClusterServers()
        .addNodeAddress(hosts.split("[,]"))
        .setPassword(password)
        .setScanInterval(2000)
        .setMasterConnectionPoolSize(10000)
        .setSlaveConnectionPoolSize(10000);
    return Redisson.create(config);
}

主从配置如下:

redisson:
	addr:
		masterAndSlave:
            masterhost: redis://47.96.11.185:6370
            slavehosts: redis://47.96.11.185:6371,redis://47.96.11.185:6372
            password: 12345678
            database: 0

客户端配置:

@Configuration
public class RedissonConfig3 {
    @Value("${redisson.addr.masterAndSlave.masterhost}")
    private String masterhost;
    @Value("${redisson.addr.masterAndSlave.slavehosts}")
    private String slavehosts;
    @Value("${redisson.addr.masterAndSlave.password}")
    private String password;
    @Value("${redisson.addr.masterAndSlave.database}")
    private int database;
 
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
        config.useMasterSlaveServers()
            .setMasterAddress(masterhost)
            .addSlaveAddress(slavehosts.split("[,]"))
            .setPassword(password)
            .setDatabase(database)
            .setMasterConnectionPoolSize(10000)
            .setSlaveConnectionPoolSize(10000);
        return Redisson.create(config);
    }

至此几种Redis的模式都涉及到了

Redission使用‘

  1. 获取公平锁和非公平锁
//获取公平锁
RLock lock = redissonClient.getFairLock(skuId);
//获取?公平锁
RLock lock = redissonClient.getLock(skuId);
  1. 加锁——阻塞和非阻塞锁
//阻塞锁(如果加锁成功之后,超时时间为30s;加锁成功开启看?狗,剩5s延?过期时间)
lock.lock();
//阻塞锁(如果加锁成功之后,设置?定义20s的超时时间)
lock.lock(20,TimeUnit.SECONDS);
//?阻塞锁(设置等待时间为3s;如果加锁成功默认超时间为30s)
boolean b = lock.tryLock(3,TimeUnit.SECONDS);
//?阻塞锁(设置等待时间为3s;如果加锁成功设置?定义超时间为20s)
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);

//释放锁
lock.unlock();
  1. 公平非阻塞锁
//公平?阻塞锁
RLock lock = redissonClient.getFairLock(skuId);
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);

至此几种基本的锁使用都涉及到了

上面业务代码的优化

首先先抽象出这个业务结构:

//伪代码如下:
HashMap map = null;
加锁
try{
    if(isLock){
        校验库存
        if(库存充?){
              保存订单
              保存快照
              修改库存
              删除购物?
              map = new HashMap();
              ...
        }
    }
}catch(Exception e){
    e.printStackTrace();
}finally{
    释放锁
}
return map;

在上面的业务代码中就可以优化成这个写法

/**
* 1.根据购物车选中的cid来查询购物车记录信息
* 2.校验库存
* 3.生成订单信息
* 4.生成快照信息
* 5.扣减库存
* 6.删除购物车中记录(已生成订单的就可以直接删除)
*/
public Map<String,String> addOrder(String cids, Orders order) {
    Map<String,String> map = null;

    //1.根据购物车选中的cid来查询商品信息
    String[] split = cids.split(",");
    List<Integer> list = new ArrayList<>();
    for (String s : split) {
        list.add(Integer.parseInt(s));
    }
    List<ShoppingCartVO> shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);

    /**
    * 将所有的skuid写入redis ,有一个不满足就不能进行上锁
    */
    boolean isLock = true;
    String skuIds[] = new String[shoppingCartVOS.size()];
    Map<String,RLock> locks = new HashMap<>();
    for (int i = 0; i < shoppingCartVOS.size(); i++) {
        String skuId = shoppingCartVOS.get(i).getSkuId();
        //进行上锁,这里就直接使用Redisson进行操作
        try {
            RLock lock = redissonClient.getFairLock(skuId);
            boolean ifAbsent = false;
            ifAbsent = lock.tryLock(3,20, TimeUnit.SECONDS);
            if (ifAbsent){
                skuIds[i] = skuId;
                locks.put(skuId,lock);
            }
            isLock = isLock && ifAbsent;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //如果每个商品都没有被其他人操作,既可以进行操作,否则就要释放每一个上过的Lock
    if(isLock){
        try {
            //2.校验库存 --> shoppingCartVOS 中的cartNum < stock即可
            //这里需要再次查询是第一次查询是没有加锁,如果查完被别人先加锁修改了,那么这里的库存就亏不对,所以需要再次查询
            shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
            boolean stockFlag = true;
            String untitled = "";   //订单中产品的名字,如果有多个就用 “,” 分割
            for (ShoppingCartVO cartVO : shoppingCartVOS) {
                if(Integer.parseInt(cartVO.getCartNum()) > cartVO.getSkuStock()){
                    stockFlag = false;
                    break;
                }
                untitled += cartVO.getProductName() + ",";
            }

            if(stockFlag){
                //3. 保存订单:
                //4.生成快照信息
                //扣减库存
                map = new HashMap<>();
                map.put("orderId",orderId);
                map.put("productNames",untitled);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //释放锁
            for (int i = 0; i < skuIds.length; i++) {
                String skuId = skuIds[i];
                if (skuId != null && !"".equals(skuId)){
                    locks.get(skuId).unlock();
                    System.out.println("-----------------------unlock");
                }
            }
        }
    }
    return map;
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:05:37  更:2022-04-15 00:06:25 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:09:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码