IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 分布式锁(MySQL&Redis) -> 正文阅读

[大数据]分布式锁(MySQL&Redis)

分布式锁(MySQL&Redis)


学习准备

gitee代码:https://gitee.com/naruto12138/distributed_lock.git

1. 传统锁

1.1搭建一个减库存的简单案例工具

创建一个Springboot测试项目

新建对象Stock

package com.example.distributedlock.pojo;

import lombok.Data;

@Data
public class Stock {
        private Integer stock =5000;
}

Service:

package com.example.distributedlock.service;

import com.example.distributedlock.pojo.Stock;
import org.springframework.stereotype.Service;

@Service
public class StockService {
    private Stock stock = new Stock();//Service初始化时便已存在

    public synchronized void deduct(){
        stock.setStock(stock.getStock()-1);
        System.out.println("库存余量"+stock.getStock());
    }
}

controller:

package com.example.distributedlock.controller;

import com.example.distributedlock.service.StockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class StockController {
    @Autowired
    private StockService service;

    @GetMapping("stock/deduct")
    public  String  dedduct(){
        service.deduct();
        return "Hello";
    }
}

使用jemter测试:

  1. 新建线程组
  1. 添加请求:

  1. 开启日志

点击启动

发现余量顺利减少到零

也可修改service:用ReentrantLock来加锁

package com.example.distributedlock.service;

import com.example.distributedlock.mapper.StockMapper;
import com.example.distributedlock.pojo.Stock;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.concurrent.locks.ReentrantLock;

@Service
public class StockService {
    private Stock stock = new Stock();//Service初始化时便已存在

    private ReentrantLock lock = new ReentrantLock();

    public void deduct(){
        lock.lock();
        try {
            stock.setStock(stock.getStock() - 1);
            System.out.println("库存余量" + stock.getStock());
        }finally {
            lock.unlock();
        }
    }
}

1.2 改造代码见MySQL中的库存

通常共享资源存在于服务外部,例如MySQL的数据

MySQL提供了乐观和悲观锁,但redis没有设置

为了方便,直接使用mybatis-plus

添加pom.xml依赖

        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.3.4</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

创建数据库:

  1. 添加mapper

    package com.example.distributedlock.mapper;
    
    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    import com.example.distributedlock.pojo.Stock;
    
    public interface StockMapper extends BaseMapper<Stock> {
    }
    
  2. 修改stock

    package com.example.distributedlock.pojo;
    
    import com.baomidou.mybatisplus.annotation.TableName;
    import lombok.Data;
    
    
    @TableName("db_stock")
    @Data
    public class Stock {
            private Long id;
            private String productCode;
            private String warehouse;
            private Integer count;
    }
    
  3. 修改service

    @Autowired
    private StockMapper stockMapper;
    
    public void deduct(){
        try {
            Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_code","1001"));
            if(stock !=null && stock.getCount()>0){
                stock.setCount(stock.getCount() -1);
                stockMapper.updateById(stock);
            }
        }
        finally {
    
        }
    }
    
  4. 重新测试:

    发送五千条

    会发现:

    出现了安全性问题

为什么会出现这样的问题呢?

我们的极限可能是:5000~9950其中的一个随机值

但如果我们对service像之前一样加锁会如何?

修改service层:

    @Autowired
    private StockMapper stockMapper;
    private ReentrantLock lock = new ReentrantLock();
    public void deduct(){
        lock.lock();
        try {
            Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_code","1001"));
            if(stock !=null && stock.getCount()>0){
                stock.setCount(stock.getCount() -1);
                stockMapper.updateById(stock);
            }
        }
        finally {
            lock.unlock();
        }
    }

成功减少为4891

1.3 三种情况有可能导致MySQL锁机制失效

1.3.1 多例模式

将代码改装成多例模式

我们会发现速度明显提高:

锁机制失效!

事务:使用Read Uncommitted可以解决,但不推荐使用

1.3.2 集群部署(部署在多台服务器)

再通过nginx实现集群部署负载均衡,此时也会发生

1.3.3 存在事务

例如使用@Transactional回滚时,RC读取发生错误,修改的数据还未上传,此时就已经被读取。

1.4 通过MySQL语句自带锁解决

在对MySQL操作时update、insert、delete写操作本身就会加锁

使用语句:

UPDATE db_stock SET COUNT = COUNT - 1 WHERE product_code ='1001' AND COUNT >=1 

在mapper上添加一个新update方法

package com.example.distributedlock.mapper;

import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.distributedlock.pojo.Stock;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;

public interface StockMapper extends BaseMapper<Stock> {
    @Update("UPDATE db_stock SET COUNT = COUNT - #{count} WHERE product_code =#{productCode} AND count >= #{count}  ")
    int updateStock(@Param("productCode") String productCode,@Param("count") Integer count);
}

对Serveice进行修改

@Autowired
private StockMapper stockMapper;
private ReentrantLock lock = new ReentrantLock();

public void deduct(){
    try {
        stockMapper.updateStock("1001",1);
    }
    finally {
    }
}

再用nginx进行负载均衡;

测试发现,正好减少了5000,且速度更快

1.5 SQL语句的优缺点

优点:上述三个问题都可以解决

缺点:

  • 锁的范围问题
  • 缺少逻辑性,具有局限性
  • 无法记录库存前后的变化数量,再日志上不存在

锁的范围问题

悲观锁是一个行级锁,回

  1. 锁的查询或者更新条件必须是索引字段
  2. 查询或者更新条件必须是具体值

1.6 MySQL悲观锁使用select…update from

为了解决我们的悲观锁发生的问题

表中新添加一个数据

给mapper新添加一个方法使用select…update from语法实现悲观锁

package com.example.distributedlock.mapper;

import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.distributedlock.pojo.Stock;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;

import java.util.List;

public interface StockMapper extends BaseMapper<Stock> {
    @Update("UPDATE db_stock SET COUNT = COUNT - #{count} WHERE product_code =#{productCode} AND count >= #{count}  ")
    int updateStock(@Param("productCode") String productCode,@Param("count") Integer count);

    @Select("select * from db_stock where product_code=#{productCode} for update")
    List<Stock> queryStock(String productCode);
}

修改Service

这里必须要使用@Transactional实现回滚

@Autowired
private StockMapper stockMapper;

@Transactional
public void deduct(){
    try {
        //1. 查询库存信息并锁定库存信息
        List<Stock> stocks = stockMapper.queryStock("1001");
        //取第一个库存
        Stock stock = stocks.get(0);
        //判断库存是否充足
        if(stock!=null && stock.getCount()>0){
            //2.扣减库存
            stock.setCount(stock.getCount()-1);
            stockMapper.updateById(stock);
        }

    } finally {
    }
}

再重启两个服务器测试,通过nginx负载

最后用jemter测试,会发现速度比以往都慢很多(性能低),但成功实现了并发,并且更加灵活,并且可以记录库存变化前后的状态,方便我们的操作

1.7 select…update from存在的问题

  1. 如上所述,性能太差

  2. 会出现死锁,对多条数据进行操作时,需要注意加锁的顺序,保证加锁顺序一致

    • 例如:有A、B两个对象,希望操作锁1、锁2
      1. A获取了锁1,未释放
      2. B获取了锁2,未释放
      3. 下一步A希望获取锁2、B希望获取锁1
      4. 此时发生死锁,A、B锁死
    1. 库存操作需要统一:select…update from操作表,则其他队表的操作最好不要使用select

      此时锁失效,发生并发问题

1.8 MySQL乐观锁

时间戳/version版本号CAS机制

CAS: Compare And Swap 比较并交换

例如: A、B两个对象希望对一个数据α进行操作:

  • A获取α和α的时间戳
  • B获取α和α的时间戳
  • B修改α,并更新α时间戳
  • A希望更新α,但发现此前获取的时间戳与目前的时间戳不一致,A放弃更新,重新获取
  • 直到A获取的时间戳与查询到的时间戳一致,则对α进行改变。

实现一个乐观锁

给表添加一个version的字段

直接修改service层: @Transactional需要注释掉,不可回滚会发生错误

@Autowired
private StockMapper stockMapper;

//    @Transactional
public void deduct(){
    //1. 查询库存信息并锁定库存信息(这里使用普通的select)
    List<Stock> stocks = stockMapper.selectList(new QueryWrapper<Stock>().eq("product_code","1001"));
    //取第一个库存
    Stock stock = stocks.get(0);
    //判断库存是否充足
    if(stock != null && stock.getCount() > 0){
        //2.扣减库存
        stock.setCount(stock.getCount() - 1);
        Integer version = stock.getVersion();
        stock.setVersion(version + 1);
        //如果更新失败
        if(stockMapper.update(stock,new UpdateWrapper<Stock>().
                              eq("id",stock.getId()).
                              eq("version",version))==0){
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            deduct();//递归重试
        }
    }
}

1.9 乐观锁存在的问题

  1. 高并发情况下,性能极低,而且是越往后越低!!!

  2. ABA问题

    比如说查找时是A,但是有人修改成了B,但是又有人通过非法操作,返回成了A,导致版本故障。

  3. 读写分离情况下,导致乐观锁不可靠

    读取会有一个缓存流,高强度下,乐观锁会发生错误

1.10 MySQL锁总结

性能:一个sql>悲观锁>JVM锁>乐观锁

  • 追求机制性能、业务场景简单闭关且不需要记录数据前后变化的情况下。优先使用一个SQL

  • 如果写并发量较低(多读),争抢不是很强烈的情况下优先:乐观锁(无死锁)

  • 如果并发量高,一般容易冲突,使用mysql悲观锁

  • 不推荐jvm本地锁

2. 基于redis的分布式锁

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

添加properties配置,默认端口号6379,应该没有人会改变吧

spring.redis.host=localhost

重写Sercvice的方法:

@Autowired(required = false)
private StringRedisTemplate redisTemplate;

public void deduct(){
    //1. 查询库存信息并锁定库存信息(这里使用普通的select)
        String stock = redisTemplate.opsForValue().get("stock");
    //2.判断库存是否充足
    if(stock != null && stock.length() > 0) {
        Integer integer = Integer.valueOf(stock);
        if(integer>0){
            //3.扣减库存
            redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
        }
    }
}

再redis里设置stock为5000

压力测试一下,发现最后并没有减少到零

这时我们就需要进行锁操作

  1. jvm 、单例、synchronized解决,不再演示
  2. 使用redis乐观锁
  3. 分布式锁机制

2.1 redis乐观锁

watch指令:可以监控一个或者多个key的值,如果再事务(exec)执行之前,key的值发生变化则去校事务执行

multi:开启事务

exec:提交事务

    @Autowired(required = false)
    private StringRedisTemplate redisTemplate;

    public void deduct(){
        //Sessioncallback:允许事务,开启指令
        redisTemplate.execute(new SessionCallback<Object>() {
            @Override//RedisOperations本质上就算一个RedisTemplate
            public <K, V> Object execute(RedisOperations<K,V> operations) throws DataAccessException {
                //watch
                operations.watch((K) "stock");
                //1. 查询库存信息并锁定库存信息(这里使用普通的select)
                String stock = operations.opsForValue().get("stock").toString();
                //2.判断库存是否充足
                if(stock != null && stock.length() > 0) {
                    Integer integer = Integer.valueOf(stock);
                    if(integer>0){
                        //multi
                        operations.multi();
                        //3.扣减库存
                        operations.opsForValue().set((K)"stock",(V)String.valueOf(--integer));
                        //exec
                        List<Object> exec = operations.exec();
                        if(exec == null|| exec.size()==0){
                            try {
                                Thread.sleep(100);
                                deduct();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        return exec;
                    }
                }
                return null;
            }
        });

    }

缺点:性能问题,redis的性能消耗过大!

2.2 redis 分布式锁

可以做到跨服务、跨进程、跨服务器

应用场景:

  • 超卖现象(NoSQL)
  • 缓存击穿

缓存击穿

MySQL是放在硬盘上的数据,为了访问便捷,我们可以使用redis来提高访问速度(通过缓存)

缓存击穿:一个热点key过期,导致系统奔溃。

过期时间:为了防止服务器因缓存过多而爆满。

当一个很热门的key过期了,大量的访问数据在redis找不到这个值,就到MySQL

里进行访问,此时MySQL难以承受而导致宕机。

解决方案:添加一个锁机制,对访问进行以此处理

此时,JVM锁十分不便,会因为一条请求而耽误其他的请求。

分布式锁的实现

  1. 基于redis实现
  2. 基于zookeeper/etcd实现
  3. 基于MySQL实现

特征:

  1. 独占排他使用

    1. 上锁

      setnx

    2. 解锁

      del

    3. 重试:

      1. 递归

      2. 循环

  2. 防止死锁

  3. 原子性:

    获取锁和过期时间

  4. 防错误删除

    先判断再删除。

    在key中添加uuid

  5. 自动续期

    防止业务逻辑没有完全进行

首先我们先来使用递归

@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
    //加锁 如果不存在这个名字的锁,则加一个
    Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");

    //重试:递归调用
    if(!lock){
        try {
            Thread.sleep(20);
            deduct();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    else{
        try {
            //1,查询库存信息
            String stock = redisTemplate.opsForValue().get("stock").toString();

            //2.判断库存是否充足
            if(stock != null && stock.length() !=0){
                Integer integer = Integer.valueOf(stock);
                if(integer>0){
                    //3. 扣减库存
                    redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁
            redisTemplate.delete("lock");
        }

    }
}

成功!!

递归的坏处:引发栈内存溢出出错

,因此我们需要改成循环

CAS自旋锁!

@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
    //重试:递归调用
    while( !redisTemplate.opsForValue().setIfAbsent("lock", "111")){
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    try {
        //1,查询库存信息
        String stock = redisTemplate.opsForValue().get("stock").toString();

        //2.判断库存是否充足
        if(stock != null && stock.length() !=0){
            Integer integer = Integer.valueOf(stock);
            if(integer>0){
                //3. 扣减库存
                redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
            }
        }
    } finally {
        // 解锁
        redisTemplate.delete("lock");

    }
}

给锁添加过期时间,防止死锁

在redis下使用命令:EXPIPE lock 20

(使用ttl lock查看死亡时间)

while( !redisTemplate.opsForValue().setIfAbsent("lock","111",3,TimeUnit.SECONDS)){

一开始就需要设置存活时间

set key value ex 3 nx指令

防误删除 使用 UUID uuid = UUID.randomUUID();

@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
    UUID uuid = UUID.randomUUID();
    //重试:递归调用
    while( !redisTemplate.opsForValue().setIfAbsent("lock", uuid.toString(),3,TimeUnit.SECONDS)){
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    try {
        //            redisTemplate.expire("lock",3, TimeUnit.SECONDS);
        //1,查询库存信息
        String stock = redisTemplate.opsForValue().get("stock").toString();

        //2.判断库存是否充足
        if(stock != null && stock.length() !=0){
            Integer integer = Integer.valueOf(stock);
            if(integer>0){
                //3. 扣减库存
                redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
            }
        }
    } finally {
        // 与uuid相同则解锁
        String lock = redisTemplate.opsForValue().get("lock");
        if(StringUtils.equals(uuid.toString(),lock)){
            redisTemplate.delete("lock");
        }
    }
}

2.3 Lua脚本

在我们的这一步中,可能会导致,判断结束,还没来得及删除,就发生了

String lock = redisTemplate.opsForValue().get("lock");
if(StringUtils.equals(uuid.toString(),lock)){
redisTemplate.delete("lock");
}

因此,我们需要保证这一步的原子性。

此时我们要做到判断删除一步解决

Lua教程:https://www.runoob.com/lua/lua-tutorial.html

  1. 判断是否是自己的锁,如果是自己的锁,执行删除操作

    if redis.call('get',KEYS[1])==ARGV[1]
    then
        return redis.call('del',KEYS[1])
    else
        return 0
    end
    
    if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
    

3. 基于MySQL的分布式锁

基于MySQL关系型数据库实现:

redis:基于Key唯一值

zk:基于 znode 节点唯一性。

MySQL可以根据:唯一键索引

3.1 实现

两张表:

再给表设计一个唯一键索引,如下

思路:

  1. 加锁:INSERT INTo tb_lock(lock_name) values (‘lock’)执行成功代表获取锁成功

  2. 释放锁:获取锁成功的请求执行业务操作,执行完成之后通过delete删除对应记录

  3. 重试:递归、或循环操作

修改deduct方法

@Autowired(required = false)
private StringRedisTemplate redisTemplate;
@Autowired
private LockMapper lockMapper;
public void deduct(){
    try {
        Lock lock = new Lock();
        lock.setLockName("lock");
        lockMapper.insert(lock);
        //1. 查询库存信息
        String stock = redisTemplate.opsForValue().get("stock").toString();

        //2. 判断库存是否充足
        if (stock != null && stock.length() != 0) {
            Integer st = Integer.valueOf(stock);
            if (st > 0) {
                redisTemplate.opsForValue().set("stock", String.valueOf(--st));
            }
        }
        lockMapper.deleteById(lock.getId());
    }
    //解锁
    catch (Exception e){
        e.printStackTrace();
        //重试
        try{
            Thread.sleep(50);
            deduct();
        }
        catch (Exception e1){
            e1.printStackTrace();
        }
    }
}

测试发现性能很差

实现思路与方法:

  1. 独占排他互斥使用 唯一键索引

  2. 防死锁:(服务器有可能宕机或者人为i而修改)

    添加一个lock_time在表中,调用时与当前时间进行对比,如果已经超过预期时间自动删除

  3. 不可重入:

    修改为可重入:记录服务信息,及线程信息,重入次数

  4. 防误删:

    借助lock_name唯一性

  5. 原子性:
    一个写操作、或MySQL的悲观锁

  6. 可重入:

  7. 自动续期:

    通过一个定时任务来保证我们的任务完成后才能被删除

  8. 单机故障:

    搭建MySQL主备

    集群情况下锁机制失效问题

  9. 阻塞锁

4. 总结

  1. 简易程序:MySQL>redis(Lua脚本)>zk
  2. 性能:redis > zk > mysql
  3. 可靠性: zk>redis=mysqkl
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-10-08 20:48:45  更:2022-10-08 20:49:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 5:21:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码