IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 关于分布式锁的续命问题——基于Redis实现的分布式锁 -> 正文阅读

[大数据]关于分布式锁的续命问题——基于Redis实现的分布式锁

目录

一、背景

二、自定义实现

三、Redisson框架实现


一、背景

关于分布式锁就不多说了,现在出现了一种场景,如果在分布式锁中,业务代码没有执行完,然后锁的键值过期了,那么其他的JVM就可能会获取到锁而发生幂等的问题。那么这种情况怎么解决呢?

1、如果线程1获取到了锁,那么在业务代码还没执行完时,redis键值过期了,那么就会发生幂等问题。解决的思路是,当线程1获取到锁之后,开启一个线程去监听线程1是否执行完成,如果没有执行完成,就去延长键值的过期时间。

2、如果因为某些原因(比如代码bug,网络不稳定),锁一直不能释放,然后就要一直进行延长过期时间么?答案是否定的,我们可以设置延长时间的次数,如果超过设定的次数还是失败,就自动释放锁,然后回滚业务。

3、释放锁时需要考虑的问题,谁加的锁,就让谁来释放锁。

二、自定义实现

锁信息实体类

package com.xiaojie.lock;


import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 记录锁信息
 * @date 2022/6/9 17:44
 */
public class RedisLockInfo {
    /**
     * 锁的状态 state为true 则表示获取锁成功
     */
    private boolean state;
    /**
     * 锁的id
     */
    private String lockId;
    /**
     * 锁的持有线程
     */
    private Thread lockThread;

    /**
     * 锁的过期时间
     */
    private Long expire;

    /**
     * 续命次数
     */
    private AtomicInteger lifeCount;

    /**
     * 获取锁次数
     */
    private AtomicInteger lockCount;

    // 锁的可重入次数
    public RedisLockInfo(String lockId, Thread lockThread, Long expire) {
        state = true;
        this.lockId = lockId;
        this.lockThread = lockThread;
        this.expire = expire;
        lifeCount = new AtomicInteger(0);
        lockCount = new AtomicInteger(0);
    }

    public RedisLockInfo(Thread lockThread, Long expire) {
        this.lockThread = lockThread;
        this.expire = expire;
        lifeCount = new AtomicInteger(0);
        lockCount = new AtomicInteger(0);
        state = true;
    }

    public RedisLockInfo() {
        state = true;
    }

    public String getLockId() {
        return lockId;
    }

    public boolean isState() {
        return state;
    }

    public Thread getLockThread() {
        return lockThread;
    }

    public Long getExpire() {
        return expire;
    }

    //重试的次数
    public Integer getLifeCount() {
        return lifeCount.incrementAndGet();
    }

    //锁获取的次数
    public Integer incrLockCount() {
        return lockCount.incrementAndGet();
    }

    //释放锁次数
    public Integer decreLockCount() {
        return lockCount.decrementAndGet();
    }
}

获取锁的方法和续命方法

package com.xiaojie.lock.impl;

import com.xiaojie.lock.RedisLock;
import com.xiaojie.lock.RedisLockInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 实现分布式锁的实现类
 * @date 2022/6/9 18:05
 */
@Component
@Slf4j
public class RedisLockImpl implements RedisLock {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private String redisLockKey = "redisLockKey";
    /**
     * 缓存redis锁
     */
    private static Map<Thread, RedisLockInfo> lockCacheMap = new ConcurrentHashMap<>();
    /**
     * 重试时间
     */
    private Long timeout = 3000L;
    private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();


    @Override
    public boolean tryLock() {
        Thread currentThread = Thread.currentThread();
        RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
        //判断是不是当前线程获取到了锁
        if (null != redisLockInfo && redisLockInfo.isState()) {
            //证明已经获取到了锁,锁重入
            log.info(">>>>>>>>>>>>>>>>>已经获取到了锁");
            redisLockInfo.incrLockCount(); //获取锁的次数加1
            return true;
        }
        Long startTime = System.currentTimeMillis();
        Long lockExpire = 30l; //键值的过期时间
        //重试获取锁
        for (; ; ) {
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(redisLockKey, "1", lockExpire, TimeUnit.SECONDS);
            if (lock) {
                //获取锁成功
                log.info(">>>>>>>>>>>>>>获取锁成功");
                lockCacheMap.put(currentThread, new RedisLockInfo(currentThread, lockExpire));
                return true;
            }
            // 控制一个超时的时间
            Long endTime = System.currentTimeMillis();
            if (endTime - startTime > timeout) {
                log.info("在3秒内已经过了重试时间了>>>>>>>>>>>");
                return false;
            }
            // 继续循环
            try {
                Thread.sleep(10); //休眠避免CPU飙高
            } catch (Exception e) {

            }
        }
    }

    @Override
    public boolean releaseLock() {
        Thread currentThread = Thread.currentThread();
        RedisLockInfo redisLockInfo = lockCacheMap.get(currentThread);
        if (null != redisLockInfo && redisLockInfo.isState()) {
            if (redisLockInfo.decreLockCount() <= 0) {
                lockCacheMap.remove(currentThread);
                //删除键值
                stringRedisTemplate.delete(redisLockKey);
                return true;
            }
        }
        return false;
    }

    public RedisLockImpl() {
        //开始定时任务实现续命,每3秒续命一次
        this.scheduledExecutorService.scheduleAtFixedRate(new LifeExtensionThread(), 0, 3, TimeUnit.SECONDS);
    }

    //定义监听线程
    class LifeExtensionThread implements Runnable {
        @Override
        public void run() {
            log.info("开始续命线程>>>>>>>>>>>>>>>>>>>>>>>>>");
            if (lockCacheMap.size() > 0) {
                lockCacheMap.forEach((k, lockInfo) -> {
                    try {
                        Thread lockServiceThread = lockInfo.getLockThread();
                        if (lockServiceThread.isInterrupted()) {
                            //判断线程是否执行完毕
                            log.info("当前线程已经被停止>>>>>>>>>>>>>>>");
                            lockCacheMap.remove(k); //从缓存中移除线程
                            return;
                        }
                        // 续命键值过期时间
                        Integer lifeCount = lockInfo.getLifeCount();
                        if (lifeCount > 5) {
                            log.info(">>>>>>>>>>>>>>>>>您已经续命了多次当前线程还没有释放锁,现在主动将该锁释放 避免死锁的问题");
                            // 1.回滚业务(例如事务回滚)
                            // 2.释放锁
                            releaseLock();
                            // 3.将该线程主动停止
                            lockServiceThread.interrupt();
                            // 4.移除监听
                            lockCacheMap.remove(k);
                            return;
                        }
                        //提前实现续命 延长过期key的时间
                        stringRedisTemplate.expire(redisLockKey, lockInfo.getExpire(), TimeUnit.SECONDS);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
}

模拟效果使用 Jemeter模拟秒杀的业务

package com.xiaojie.controller;

import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 秒杀的接口
 * @date 2022/6/9 22:55
 */
@RestController
public class SeckillController {
    @Autowired
    private GoodsMapper goodsMapper;
    @Autowired
    private RedisLock redisLock;

    @GetMapping("/secKill")
    public String secKill(Long id) {
        try {
            if (!redisLock.tryLock()) {
                //获取锁失败
                return "活动太火了,请稍后重试";
            }
            Goods goods = goodsMapper.selectById(id);
            if (null == goods) {
                return "该商品没有秒杀活动";
            }
            if (goods.getStock() <= 0) {
                return "商品库存为空了。。。。。";
            }
            //减库存
            Integer result = goodsMapper.deceGoods(id);
            return result > 0 ? "秒杀成功" : "秒杀失败";
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            redisLock.releaseLock();
        }
        return "fail";
    }
}

三、Redisson框架实现

图片来源? ?https://blog.csdn.net/menxu_work/article/details/123827526 ?

1、Redisson向redis写入键值的时候不再使用setNx的命令,而是使用Lua脚本的形式写入

2、写入键值成功之后会设置一个后台线程(看门狗线程)来对键值进行续命,默认值是每10秒续命一次,续命时间为30秒

3、键值的类型为hash类型,如下图,其中1是锁入的次数

?4、释放锁的时候,也是通过Lua脚本的方式,当锁入的次数小于0时,就会删除键值。

源码分析

加锁代码

 private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
    //如果没有过期时间,证明键值不存在,直接返回
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

//键值存在,自旋方式去获取锁
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
//取消订阅
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }


  <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " + //判断键值是否存在,==0不存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + //创建哈希类型键值,并设置值为1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +//设置过期时间 ,默认是30s
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//键值存在
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//将键值的value值加1
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " + //设置过期时间为30s
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);", //键值存在,并且不是重入锁,则返回键值剩余存活时间
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

释放锁的代码

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + //键值不存在则表示锁已经释放了
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //简直存在,则键值减1
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " + //如果减1后键值还是存在,则重新设置过期时间
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " + //删除键值
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

续命的代码

?模拟秒杀代码实现

package com.xiaojie.redisson;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author xiaojie
 * @version 1.0
 * @description: reddisson配置
 * @date 2022/6/10 2:05
 */
@Configuration
public class RedissonConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private String port;
    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        //设置看门狗时间
//        config.setLockWatchdogTimeout(30000L);
        //设置单机版本redis
        config.useSingleServer().setAddress("redis://" + host + ":" + port);
        //设置密码
        config.useSingleServer().setPassword(password);
        //设置集群的方式
//        config.useClusterServers().addNodeAddress("redis://" + host + ":" + port);
//        config.useClusterServers().addNodeAddress("redis://" + host2 + ":" + port2);
        //添加主从配置
//      config.useMasterSlaveServers().setMasterAddress("").setPassword("").addSlaveAddress(new String[]{"",""});
        return Redisson.create(config);
    }

}

秒杀的接口

package com.xiaojie.controller;

import com.xiaojie.entity.Goods;
import com.xiaojie.lock.RedisLock;
import com.xiaojie.mapper.GoodsMapper;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 秒杀的接口
 * @date 2022/6/9 22:55
 */
@RestController
public class RedissonSeckillController {
    @Autowired
    private GoodsMapper goodsMapper;
    @Autowired
    private RedissonClient redissonClient;

    @GetMapping("/secKillRedisson")
    public String secKillRedisson(Long id) {
        RLock rLock = null;
        try {
            rLock = redissonClient.getLock(id + "");
            rLock.lock(); //加锁,加几次锁,finally释放锁的时候就要释放几次锁
            Goods goods = goodsMapper.selectById(id);
            if (null == goods) {
                return "该商品没有秒杀活动";
            }
            if (goods.getStock() <= 0) {
                return "商品库存为空了。。。。。";
            }
            //减库存
            Integer result = goodsMapper.deceGoods(id);
            return result > 0 ? "秒杀成功" : "秒杀失败";
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            rLock.unlock();
        }
        return "fail";
    }
}

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-06-14 22:40:05  更:2022-06-14 22:42:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 3:40:53-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码