分布式锁(MySQL&Redis)
学习准备
gitee代码:https://gitee.com/naruto12138/distributed_lock.git
1. 传统锁
1.1搭建一个减库存的简单案例工具
创建一个Springboot测试项目
新建对象Stock
package com.example.distributedlock.pojo;
import lombok.Data;
@Data
public class Stock {
private Integer stock =5000;
}
Service:
package com.example.distributedlock.service;
import com.example.distributedlock.pojo.Stock;
import org.springframework.stereotype.Service;
@Service
public class StockService {
private Stock stock = new Stock();
public synchronized void deduct(){
stock.setStock(stock.getStock()-1);
System.out.println("库存余量"+stock.getStock());
}
}
controller:
package com.example.distributedlock.controller;
import com.example.distributedlock.service.StockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class StockController {
@Autowired
private StockService service;
@GetMapping("stock/deduct")
public String dedduct(){
service.deduct();
return "Hello";
}
}
使用jemter测试:
- 新建线程组
- 添加请求:
- 开启日志
点击启动
发现余量顺利减少到零
也可修改service:用ReentrantLock来加锁
package com.example.distributedlock.service;
import com.example.distributedlock.mapper.StockMapper;
import com.example.distributedlock.pojo.Stock;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import java.util.concurrent.locks.ReentrantLock;
@Service
public class StockService {
private Stock stock = new Stock();
private ReentrantLock lock = new ReentrantLock();
public void deduct(){
lock.lock();
try {
stock.setStock(stock.getStock() - 1);
System.out.println("库存余量" + stock.getStock());
}finally {
lock.unlock();
}
}
}
1.2 改造代码见MySQL中的库存
通常共享资源存在于服务外部,例如MySQL的数据
MySQL提供了乐观和悲观锁,但redis没有设置
为了方便,直接使用mybatis-plus
添加pom.xml依赖
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
创建数据库:
-
添加mapper package com.example.distributedlock.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.distributedlock.pojo.Stock;
public interface StockMapper extends BaseMapper<Stock> {
}
-
修改stock package com.example.distributedlock.pojo;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
@TableName("db_stock")
@Data
public class Stock {
private Long id;
private String productCode;
private String warehouse;
private Integer count;
}
-
修改service @Autowired
private StockMapper stockMapper;
public void deduct(){
try {
Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_code","1001"));
if(stock !=null && stock.getCount()>0){
stock.setCount(stock.getCount() -1);
stockMapper.updateById(stock);
}
}
finally {
}
}
-
重新测试: 发送五千条 会发现: 出现了安全性问题
为什么会出现这样的问题呢?
我们的极限可能是:5000~9950其中的一个随机值
但如果我们对service像之前一样加锁会如何?
修改service层:
@Autowired
private StockMapper stockMapper;
private ReentrantLock lock = new ReentrantLock();
public void deduct(){
lock.lock();
try {
Stock stock = stockMapper.selectOne(new QueryWrapper<Stock>().eq("product_code","1001"));
if(stock !=null && stock.getCount()>0){
stock.setCount(stock.getCount() -1);
stockMapper.updateById(stock);
}
}
finally {
lock.unlock();
}
}
成功减少为4891
1.3 三种情况有可能导致MySQL锁机制失效
1.3.1 多例模式
将代码改装成多例模式
我们会发现速度明显提高:
锁机制失效!
事务:使用Read Uncommitted可以解决,但不推荐使用
1.3.2 集群部署(部署在多台服务器)
再通过nginx实现集群部署负载均衡,此时也会发生
1.3.3 存在事务
例如使用@Transactional回滚时,RC读取发生错误,修改的数据还未上传,此时就已经被读取。
1.4 通过MySQL语句自带锁解决
在对MySQL操作时update、insert、delete写操作本身就会加锁
使用语句:
UPDATE db_stock SET COUNT = COUNT - 1 WHERE product_code ='1001' AND COUNT >=1
在mapper上添加一个新update方法
package com.example.distributedlock.mapper;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.distributedlock.pojo.Stock;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Update;
public interface StockMapper extends BaseMapper<Stock> {
@Update("UPDATE db_stock SET COUNT = COUNT - #{count} WHERE product_code =#{productCode} AND count >= #{count} ")
int updateStock(@Param("productCode") String productCode,@Param("count") Integer count);
}
对Serveice进行修改
@Autowired
private StockMapper stockMapper;
private ReentrantLock lock = new ReentrantLock();
public void deduct(){
try {
stockMapper.updateStock("1001",1);
}
finally {
}
}
再用nginx进行负载均衡;
测试发现,正好减少了5000,且速度更快
1.5 SQL语句的优缺点
优点:上述三个问题都可以解决
缺点:
- 锁的范围问题
- 缺少逻辑性,具有局限性
- 无法记录库存前后的变化数量,再日志上不存在
锁的范围问题
悲观锁是一个行级锁,回
- 锁的查询或者更新条件必须是索引字段
- 查询或者更新条件必须是具体值
1.6 MySQL悲观锁使用select…update from
为了解决我们的悲观锁发生的问题
表中新添加一个数据
给mapper新添加一个方法使用select…update from语法实现悲观锁
package com.example.distributedlock.mapper;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.distributedlock.pojo.Stock;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import java.util.List;
public interface StockMapper extends BaseMapper<Stock> {
@Update("UPDATE db_stock SET COUNT = COUNT - #{count} WHERE product_code =#{productCode} AND count >= #{count} ")
int updateStock(@Param("productCode") String productCode,@Param("count") Integer count);
@Select("select * from db_stock where product_code=#{productCode} for update")
List<Stock> queryStock(String productCode);
}
修改Service
这里必须要使用@Transactional实现回滚
@Autowired
private StockMapper stockMapper;
@Transactional
public void deduct(){
try {
List<Stock> stocks = stockMapper.queryStock("1001");
Stock stock = stocks.get(0);
if(stock!=null && stock.getCount()>0){
stock.setCount(stock.getCount()-1);
stockMapper.updateById(stock);
}
} finally {
}
}
再重启两个服务器测试,通过nginx负载
最后用jemter测试,会发现速度比以往都慢很多(性能低),但成功实现了并发,并且更加灵活,并且可以记录库存变化前后的状态,方便我们的操作
1.7 select…update from存在的问题
-
如上所述,性能太差 -
会出现死锁,对多条数据进行操作时,需要注意加锁的顺序,保证加锁顺序一致
- 例如:有A、B两个对象,希望操作锁1、锁2
- A获取了锁1,未释放
- B获取了锁2,未释放
- 下一步A希望获取锁2、B希望获取锁1
- 此时发生死锁,A、B锁死
-
库存操作需要统一:select…update from操作表,则其他队表的操作最好不要使用select 此时锁失效,发生并发问题
1.8 MySQL乐观锁
时间戳/version版本号CAS机制
CAS: Compare And Swap 比较并交换
例如: A、B两个对象希望对一个数据α进行操作:
- A获取α和α的时间戳
- B获取α和α的时间戳
- B修改α,并更新α时间戳
- A希望更新α,但发现此前获取的时间戳与目前的时间戳不一致,A放弃更新,重新获取
- 直到A获取的时间戳与查询到的时间戳一致,则对α进行改变。
实现一个乐观锁
给表添加一个version的字段
直接修改service层: @Transactional需要注释掉,不可回滚会发生错误
@Autowired
private StockMapper stockMapper;
public void deduct(){
List<Stock> stocks = stockMapper.selectList(new QueryWrapper<Stock>().eq("product_code","1001"));
Stock stock = stocks.get(0);
if(stock != null && stock.getCount() > 0){
stock.setCount(stock.getCount() - 1);
Integer version = stock.getVersion();
stock.setVersion(version + 1);
if(stockMapper.update(stock,new UpdateWrapper<Stock>().
eq("id",stock.getId()).
eq("version",version))==0){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
deduct();
}
}
}
1.9 乐观锁存在的问题
-
高并发情况下,性能极低,而且是越往后越低!!! -
ABA问题 比如说查找时是A,但是有人修改成了B,但是又有人通过非法操作,返回成了A,导致版本故障。 -
读写分离情况下,导致乐观锁不可靠 读取会有一个缓存流,高强度下,乐观锁会发生错误
1.10 MySQL锁总结
性能:一个sql>悲观锁>JVM锁>乐观锁
2. 基于redis的分布式锁
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
添加properties配置,默认端口号6379,应该没有人会改变吧
spring.redis.host=localhost
重写Sercvice的方法:
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
String stock = redisTemplate.opsForValue().get("stock");
if(stock != null && stock.length() > 0) {
Integer integer = Integer.valueOf(stock);
if(integer>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
}
}
}
再redis里设置stock为5000
压力测试一下,发现最后并没有减少到零
这时我们就需要进行锁操作
- jvm 、单例、synchronized解决,不再演示
- 使用redis乐观锁
- 分布式锁机制
2.1 redis乐观锁
watch指令:可以监控一个或者多个key的值,如果再事务(exec)执行之前,key的值发生变化则去校事务执行
multi:开启事务
exec:提交事务
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
redisTemplate.execute(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K,V> operations) throws DataAccessException {
operations.watch((K) "stock");
String stock = operations.opsForValue().get("stock").toString();
if(stock != null && stock.length() > 0) {
Integer integer = Integer.valueOf(stock);
if(integer>0){
operations.multi();
operations.opsForValue().set((K)"stock",(V)String.valueOf(--integer));
List<Object> exec = operations.exec();
if(exec == null|| exec.size()==0){
try {
Thread.sleep(100);
deduct();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return exec;
}
}
return null;
}
});
}
缺点:性能问题,redis的性能消耗过大!
2.2 redis 分布式锁
可以做到跨服务、跨进程、跨服务器
应用场景:
缓存击穿
MySQL是放在硬盘上的数据,为了访问便捷,我们可以使用redis来提高访问速度(通过缓存)
缓存击穿:一个热点key过期,导致系统奔溃。
过期时间:为了防止服务器因缓存过多而爆满。
当一个很热门的key过期了,大量的访问数据在redis找不到这个值,就到MySQL
里进行访问,此时MySQL难以承受而导致宕机。
解决方案:添加一个锁机制,对访问进行以此处理
此时,JVM锁十分不便,会因为一条请求而耽误其他的请求。
分布式锁的实现
- 基于redis实现
- 基于zookeeper/etcd实现
- 基于MySQL实现
特征:
-
独占排他使用
-
上锁 setnx -
解锁 del -
重试:
-
递归 -
循环 -
防止死锁 -
原子性: 获取锁和过期时间 -
防错误删除 先判断再删除。 在key中添加uuid -
自动续期 防止业务逻辑没有完全进行
首先我们先来使用递归
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");
if(!lock){
try {
Thread.sleep(20);
deduct();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else{
try {
String stock = redisTemplate.opsForValue().get("stock").toString();
if(stock != null && stock.length() !=0){
Integer integer = Integer.valueOf(stock);
if(integer>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
}
}
} finally {
redisTemplate.delete("lock");
}
}
}
成功!!
递归的坏处:引发栈内存溢出出错
,因此我们需要改成循环
CAS自旋锁!
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
while( !redisTemplate.opsForValue().setIfAbsent("lock", "111")){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
String stock = redisTemplate.opsForValue().get("stock").toString();
if(stock != null && stock.length() !=0){
Integer integer = Integer.valueOf(stock);
if(integer>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
}
}
} finally {
redisTemplate.delete("lock");
}
}
给锁添加过期时间,防止死锁
在redis下使用命令:EXPIPE lock 20
(使用ttl lock 查看死亡时间)
while( !redisTemplate.opsForValue().setIfAbsent("lock","111",3,TimeUnit.SECONDS)){
一开始就需要设置存活时间
即 set key value ex 3 nx 指令
防误删除 使用 UUID uuid = UUID.randomUUID();
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
public void deduct(){
UUID uuid = UUID.randomUUID();
while( !redisTemplate.opsForValue().setIfAbsent("lock", uuid.toString(),3,TimeUnit.SECONDS)){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
String stock = redisTemplate.opsForValue().get("stock").toString();
if(stock != null && stock.length() !=0){
Integer integer = Integer.valueOf(stock);
if(integer>0){
redisTemplate.opsForValue().set("stock",String.valueOf(--integer));
}
}
} finally {
String lock = redisTemplate.opsForValue().get("lock");
if(StringUtils.equals(uuid.toString(),lock)){
redisTemplate.delete("lock");
}
}
}
2.3 Lua脚本
在我们的这一步中,可能会导致,判断结束,还没来得及删除,就发生了
String lock = redisTemplate.opsForValue().get("lock");
if(StringUtils.equals(uuid.toString(),lock)){
redisTemplate.delete("lock");
}
因此,我们需要保证这一步的原子性。
此时我们要做到判断删除一步解决
Lua教程:https://www.runoob.com/lua/lua-tutorial.html
-
判断是否是自己的锁,如果是自己的锁,执行删除操作 if redis.call('get',KEYS[1])==ARGV[1]
then
return redis.call('del',KEYS[1])
else
return 0
end
if redis.call('get',KEYS[1])==ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
3. 基于MySQL的分布式锁
基于MySQL关系型数据库实现:
redis:基于Key唯一值
zk:基于 znode 节点唯一性。
MySQL可以根据:唯一键索引
3.1 实现
两张表:
再给表设计一个唯一键索引,如下
思路:
-
加锁:INSERT INTo tb_lock(lock_name) values (‘lock’)执行成功代表获取锁成功 -
释放锁:获取锁成功的请求执行业务操作,执行完成之后通过delete删除对应记录 -
重试:递归、或循环操作
修改deduct方法
@Autowired(required = false)
private StringRedisTemplate redisTemplate;
@Autowired
private LockMapper lockMapper;
public void deduct(){
try {
Lock lock = new Lock();
lock.setLockName("lock");
lockMapper.insert(lock);
String stock = redisTemplate.opsForValue().get("stock").toString();
if (stock != null && stock.length() != 0) {
Integer st = Integer.valueOf(stock);
if (st > 0) {
redisTemplate.opsForValue().set("stock", String.valueOf(--st));
}
}
lockMapper.deleteById(lock.getId());
}
catch (Exception e){
e.printStackTrace();
try{
Thread.sleep(50);
deduct();
}
catch (Exception e1){
e1.printStackTrace();
}
}
}
测试发现性能很差
实现思路与方法:
-
独占排他互斥使用 唯一键索引 -
防死锁:(服务器有可能宕机或者人为i而修改) 添加一个lock_time在表中,调用时与当前时间进行对比,如果已经超过预期时间自动删除 -
不可重入: 修改为可重入:记录服务信息,及线程信息,重入次数 -
防误删: 借助lock_name唯一性 -
原子性: 一个写操作、或MySQL的悲观锁 -
可重入: -
自动续期: 通过一个定时任务来保证我们的任务完成后才能被删除 -
单机故障: 搭建MySQL主备 集群情况下锁机制失效问题 -
阻塞锁
4. 总结
- 简易程序:MySQL>redis(Lua脚本)>zk
- 性能:redis > zk > mysql
- 可靠性: zk>redis=mysqkl
|