1分布式锁
锁:就是对代码块背后的资源进行锁定,在同一时间段只允许有一个线程访问修改
常用的线程安全机制:
1、sychronized
?? jvm 自带的锁, 可以重入,没有超时时间,不能被外部释放 ? jdk 8对sychronized? 优化,性能更加友好 ? 线程间通信? sychronized? 和 wait() notify() 配合使用
2、lock(ReentrantLock可重入锁)
lock 是基于java 代码实现的乐观锁 ,底层用到cas + aqs ? 可以重入,由超时机制,可以被外部释放 使用起来更加灵活,可以避免死锁 线程间通信 使用 Condition? 中的 await() signal() 两个程序配合使用?
3、cas
1.读取a 得值(旧值) 2.比较并替换 ? cas(旧值,要修改的值) ? ??? 如果旧值 和 此时此刻 的值 相等 ,则将 要修改的值 赋值给该变量a 3.修改变量a? 的值 为要修改的值 ? 如果旧值和当前值不相等,说明被其他线程修改过 ? 原子类: 线程的安全的自增 自减? 底层就是 cas 机制 AutomicInteger AutomicFloat Automic* ???
4、ThreadLocal
ThreadLocal 就是线程副本,会为每一个线程对该变量存储一个副本,每个线程都访问自己的变量 ?
5、volatile:不可以保证线程安全
只能保证 数据的可见性 ,禁止指令重排序
接下来咱们在一个例子中体会分布式锁
2.例子
秒杀例子
秒杀商品:
1.判断库存是否足够
2.库存充裕 则
生成订单,扣减库存
3.返回结构,生成订单
?
?
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
?
import javax.xml.ws.soap.Addressing;
?
@RestController
public class GoodsController {
?
?
? @Autowired
? private StringRedisTemplate redisTemplate;
?
? /**
? ? * 初始化 库存 订单
? ? *
? ? * 将库存和 存储到 redis
? ? * @param goods
? ? * @return
? ? */
? @RequestMapping("/init")
? public String init(String goods){
?
? ? ? // 初始化库存
? ? ? redisTemplate.opsForValue().set("stack_"+goods, 1000+"");
?
? ? ? // 初始化订单
? ? ? redisTemplate.opsForValue().set("order_"+goods, 0+"");
?
?
? ? ? return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+
? ? ? ? ? ? ? "--订单:"+ redisTemplate.opsForValue().get("order_"+goods);
? }
?
?
? /**
? ? * 秒杀商品
? ? * @param goods
? ? * @return
? ? */
? @RequestMapping("/killGoods")
? public synchronized String killGoods(String goods){
?
? ? ? String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
?
? ? ? int stack = Integer.valueOf(stackStr);
?
? ? ? // 1.判断库存
? ? ? if (stack<1){// 说明没有库存
?
? ? ? ? ? return "很遗憾商品已售罄"+goods;
? ? ? }
?
? ? ? // 生成订单 订单加1 自增1
? ? ? redisTemplate.opsForValue().increment("order_"+goods);
?
? ? ? try {
? ? ? ? ? Thread.sleep(10);
? ? ? } catch (InterruptedException e) {
? ? ? ? ? e.printStackTrace();
? ? ? }
?
? ? ? redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
?
?
? ? ? return "当前抢购商品成功"+goods;
?
? }
?
?
? /**
? ? * 获取当前订单商品 状态
? ? * @param goods
? ? * @return
? ? */
? @GetMapping("/getState")
? public String getState(String goods){
? ? ? return "当前商品:"+goods +"--库存:"+ redisTemplate.opsForValue().get("stack_"+goods)+
? ? ? ? ? ? ? "--订单:"+ redisTemplate.opsForValue().get("order_"+goods);
?
?
? }
}
?
获取订单状态
?
秒杀
?
使用工具进行秒杀
下载ab压力测试
ab -n 请求数 -c 并发数 访问的路径
ab ? -n 5000 -c 20 http://localhost:8080/killGoods?goods=p50
?
?
可以使用 synchronized 解决线程安全
?
?
多台机器同时访问
synchronized无法解决多应用线程问题
?
?
3.使用分布式锁解决线程安全
使用redis 解决分布式线程安全问题
1.引入redis 锁
导入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
</parent>
<dependencies>
<!--springBoot 相关 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- 引入redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
?application.properties的创建
spring.redis.host=8.130.166.101
spring.redis.port=6379
?
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
?
import java.util.concurrent.TimeUnit;
?
@Component
public class RedisLock {
?
?
? @Autowired
? public StringRedisTemplate redisTemplate;
?
? /**
? ? * 尝试 获取锁
? ? * @param key
? ? * @param value
? ? * @param time
? ? * @return
? ? */
? public boolean lock(String key,String value,int time){
?
? ? // 通过 setnx 来判断key 的标记位是否存在
? ? // 如果存在说明别人已经 持有锁
? ? // ? ? 不存在 创建,并获得锁
? ? return ? redisTemplate.opsForValue().setIfAbsent(key,value,time, TimeUnit.SECONDS);
? }
?
?
? /**
? ? * 释放锁
? ? * @param key
? ? * @return
? ? */
? public boolean unlock(String key){
?
? ? return ? redisTemplate.delete(key);
? }
?
}
?
2.使用
?
? @Autowired
? private RedisLock redisLock;
?
? @RequestMapping("/redisKillGoods")
? public synchronized String redisKillGoods(String goods){
?
?
? ? ? if (redisLock.lock("lock_"+goods,"0",5)){// 拿到锁
?
? ? ? ? ? String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
? ? ? ? ? int stack = Integer.valueOf(stackStr);
?
? ? ? ? ? // 1.判断库存
? ? ? ? ? if (stack<1){// 说明没有库存
?
? ? ? ? ? ? ? // 释放锁
? ? ? ? ? ? ? redisLock.unlock("lock_"+goods);
?
? ? ? ? ? ? ? return "很遗憾商品已售罄"+goods;
? ? ? ? ? }
?
? ? ? ? ? // 生成订单 订单加1 自增1
? ? ? ? ? redisTemplate.opsForValue().increment("order_"+goods);
?
? ? ? ? ? try {
? ? ? ? ? ? ? Thread.sleep(10);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? }
?
? ? ? ? ? redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
?
? ? ? ? ? // 释放锁
? ? ? ? ? redisLock.unlock("lock_"+goods);
?
? ? ? ? ? return "当前抢购商品成功"+goods;
? ? ? }else {
?
? ? ? ? ? return "很遗憾没有抢到宝贝"+goods;
? ? ? }
?
?
? }
使用redis setnx 完成分布式锁缺陷
1.setnx 本身是两条命令 a.set值 b.配置其过期时间
2.如果redis 是主从模式,有可能造成 主从数据延迟问题
3.setnx 有时间限制,如果在使用锁期间,key时间过期了,也会造成安全问题?
解决方案:
1.使用 redis 调用lua 脚本操作 setnx 保证原子性 操作复杂 ? 2.我们使用 redlock (红锁) 我们通过多个key 保证一个分布式锁,通过投票决定是否获取锁,配置的key还拥有看门狗机制(可以查看key是否即将过期,如果快要过期,可以续命) 解决 遇到到 1,2,3 问题 ?
使用zookeeper 实现分布式锁
1.引入zk 依赖
?
? ? ?
<dependency>
? ? ? ? ? <groupId>org.apache.zookeeper</groupId>
? ? ? ? ? <artifactId>zookeeper</artifactId>
? ? ? ? ? <version>3.6.0</version>
? ? ? ? ? <exclusions>
? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? <artifactId>slf4j-api</artifactId>
? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId>
? ? ? ? ? ? ? </exclusion>
? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? <artifactId>slf4j-log4j12</artifactId>
? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId>
? ? ? ? ? ? ? </exclusion>
? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? <artifactId>log4j</artifactId>
? ? ? ? ? ? ? ? ? <groupId>log4j</groupId>
? ? ? ? ? ? ? </exclusion>
? ? ? ? ? </exclusions>
? ? ? </dependency>
?
? ? ? <dependency>
? ? ? ? ? <groupId>org.apache.curator</groupId>
? ? ? ? ? <artifactId>curator-recipes</artifactId>
? ? ? ? ? <version>4.0.1</version>
? ? ? ? ? <exclusions>
? ? ? ? ? ? ? <exclusion>
? ? ? ? ? ? ? ? ? <artifactId>slf4j-api</artifactId>
? ? ? ? ? ? ? ? ? <groupId>org.slf4j</groupId>
? ? ? ? ? ? ? </exclusion>
? ? ? ? ? </exclusions>
? ? ? </dependency>
?
2.容器加入zk
?
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
?
@Configuration
public class ZkConfig {
?
?
? /**
? ? * 在容器中加入CuratorFramework
? ? *
? ? * CuratorFramework zk 客户端
? ? * @return
? ? */
? @Bean
? public CuratorFramework getCuratorFramework(){
?
? ? ? // 当客户端 和 服务端 连接异常时,会发起重试 每隔2s 重试1次,总共试 3次
? ? ? RetryPolicy retryPolicy = new ExponentialBackoffRetry(2000,3);
?
? ? ? CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
? ? ? ? ? ? ? .retryPolicy(retryPolicy).connectString("192.168.12.145:2181;192.168.12.145:2182;192.168.12.145:2183").build();
?
?
? ? ? curatorFramework.start();
?
? ? ? return curatorFramework;
? }
?
?
?
}
?
3.使用zk 分布式锁
? @Autowired
? private CuratorFramework curatorFramework;
?
? /**
? ? * 基于zk 实现的分布式锁
? ? * @param goods
? ? * @return
? ? */
? @RequestMapping("/zkKillGoods")
? public synchronized String zkKillGoods(String goods) throws Exception {
?
? ? ? // 封装zk 的分布式锁 相当于 jvm synchronized 中的监视器
? ? ? InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework,"/"+goods);
?
? ? ? // interProcessMutex.acquire(5, TimeUnit.SECONDS) 去获取锁,若果没有获取到,就等待5s,还没有则放弃
? ? ? if (interProcessMutex.acquire(5, TimeUnit.SECONDS)){// 拿到锁
?
? ? ? ? ? String stackStr = redisTemplate.opsForValue().get("stack_"+goods);
? ? ? ? ? int stack = Integer.valueOf(stackStr);
?
? ? ? ? ? // 1.判断库存
? ? ? ? ? if (stack<1){// 说明没有库存
?
? ? ? ? ? ? ? // 释放锁
? ? ? ? ? ? ? interProcessMutex.release();
?
? ? ? ? ? ? ? return "很遗憾商品已售罄"+goods;
? ? ? ? ? }
?
? ? ? ? ? // 生成订单 订单加1 自增1
? ? ? ? ? redisTemplate.opsForValue().increment("order_"+goods);
?
? ? ? ? ? try {
? ? ? ? ? ? ? Thread.sleep(10);
? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? }
?
? ? ? ? ? redisTemplate.opsForValue().set("stack_"+goods,(stack-1)+"");
?
? ? ? ? ? // 释放锁
? ? ? ? ? interProcessMutex.release();
?
? ? ? ? ? return "当前抢购商品成功"+goods;
? ? ? }else {
?
? ? ? ? ? return "很遗憾没有抢到宝贝"+goods;
? ? ? }
?
?
? }
?
总结:
使用zk 作为分布锁,比较可靠,但是效率太低,一般适用于并发量不大的场合
如果并发量太高,zk 就是最大的性能瓶颈
|