分布式场景问题
商品超卖问题
这个问题是做项目时候学习到的,例如下单流程:同一个节点中的并发访问安全可以使用synchnized 来保证线程的安全,但是不同节点在同一个业务这是无法保证安全的。
怎么解决分布式并发问题
使用redis实现分布式的锁
将上面的步骤进行一个拆分:
总的来说就是在做这些操作之前一定需要对每一个商品进行上锁,这样其他节点就无法操作。
使用Redis实现
这里使用项目中的真实案例来实现这个过程。一个下单流程代码如下:
public Map<String,String> addOrder(String cids, Orders order) {
Map<String,String> map = new HashMap<>();
String[] split = cids.split(",");
List<Integer> list = new ArrayList<>();
for (String s : split) {
list.add(Integer.parseInt(s));
}
List<ShoppingCartVO> shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
boolean isLock = true;
String skuIds[] = new String[shoppingCartVOS.size()];
for (int i = 0; i < shoppingCartVOS.size(); i++) {
String skuId = shoppingCartVOS.get(i).getSkuId();
Boolean ifAbsent = redisTemplate.boundValueOps(skuId).setIfAbsent("sheep");
if (ifAbsent){
skuIds[i] = skuId;
}
isLock = isLock && ifAbsent;
}
if(isLock){
try {
shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
boolean stockFlag = true;
for (ShoppingCartVO cartVO : shoppingCartVOS) {
if(Integer.parseInt(cartVO.getCartNum()) > cartVO.getSkuStock()){
stockFlag = false;
break;
}
}
if(stockFlag){
map.put("orderId",orderId);
map.put("productNames",untitled);
return map;
}
}catch (Exception e){
e.printStackTrace();
}finally {
for (int i = 0; i < skuIds.length; i++) {
String skuId = skuIds[i];
if (skuId != null && !"".equals(skuId)){
redisTemplate.delete(skuId);
}
}
}
} else {
for (int i = 0; i < skuIds.length; i++) {
String skuId = skuIds[i];
if (skuId != null && !"".equals(skuId)){
redisTemplate.delete(skuId);
}
}
}
return null;
}
这里依然还有几个问题:
- 如果订单中部分商品加锁成功,但是某一个加锁失败就需要全部释放
- 如果加锁之后,出现了异常导致这个线程销毁,锁谁来释放?
解决锁无法释放问题
这里根据上面的描述不难理解其实可以通过Redis中设置超时时间来解决,这样到点必定会释放锁资源。
但是这样又出现了另一个问题,假如有线程 t1 和 t2 ,t1加锁设置超时时间,然后由于一些特殊原因运行比较慢,没有处理完业务锁过期了,此时 t2 进行加锁,但是 t1 处理完后会释放锁,这样会把 t2 的给释放掉,那么其他线程又可以进行操作造成并发问题。如图:
解决t1释放t2锁问题
根据上面问题,其实是可以联想到,释放锁的时候去检查这个锁是不是我自己锁住的。上面的代码中不难发现,加锁的时候value是固定的,可以从这里入手,使用一个map集合将这个加锁的商品保存起来,key为加锁的商品id,value是随机生成的字符串(尽可能保证唯一即可)。释放的时候就可以从redis获取,如果有值并且值就是我自己生成的那个字符串才进行释放。
代码根据上面的业务代码进行改造,可以这样设计:
boolean isLock = true;
String skuIds[] = new String[shoppingCartVOS.size()];
Map<String,String> values = new HashMap<>();
for (int i = 0; i < shoppingCartVOS.size(); i++) {
String skuId = shoppingCartVOS.get(i).getSkuId();
String uuid = UUID.randomUUID().toString();
Boolean ifAbsent = redisTemplate.boundValueOps(skuId).setIfAbsent(uuid,10, TimeUnit.SECONDS);
if (ifAbsent){
skuIds[i] = skuId;
values.put(skuId,uuid);
}
isLock = isLock && ifAbsent;
}
if(isLock){}
for (int i = 0; i < skuIds.length; i++) {
String skuId = skuIds[i];
if (skuId != null && !"".equals(skuId)){
String value = redisTemplate.boundValueOps(skuId).get();
if (value != null && map.get(skuId).equals(value)){
redisTemplate.delete(skuId);
}
}
}
这里,又有一个问题,这一切看起来好像都完美,但是,释放锁这里依然会有并发问题,比如:
t1 在释放锁的时候if判断完成,正要执行删除锁的代码,结果锁过期了,然后别人拿到这个锁,结果 t1 还是可以释放掉别人的锁,这就是一个问题,这里就需要保证判断和释放应该同时操作,也就是这两个操作要具有原子性
使用lua脚本解决
首先在resource目录下创建脚本文件:unlock.lua,这个lua脚本的意思就是从redis中查询传过来的keys[1]内容的并且跟传过来的ARGV[1]是否相等,如下:
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
配置Bean加载lua脚本:
@Bean
public DefaultRedisScript<List> defaultRedisScript(){
DefaultRedisScript<List> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(List.class);
defaultRedisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("unlock.lua")));
return defaultRedisScript;
}
通过执?lua脚本解锁:
List<String> keys = new ArrayList<>();
keys.add(skuId);
List rs = stringRedisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
System.out.println(rs.get(0));
业务代码释放锁测操作可以改为:(keys中包含skuId这个脚本会查询这个的value,后面的values.get(skuId)就是需要比较的值)
for (int i = 0; i < skuIds.length; i++) {
String skuId = skuIds[i];
if (skuId != null && !"".equals(skuId)){
List<String> keys = new ArrayList<>();
keys.add(skuId);
List rs = redisTemplate.execute(defaultRedisScript,keys , values.get(skuId));
System.out.println(rs.get(0));
}
}
分布式锁框架-Redisson
看门狗机制
线程:?于给当前key延?过期时间,保证业务线程正常执?的过程中,锁不会过期。
说白了就是一个守护线程。
Redission介绍
Redisson在基于NIO的Netty框架上,充分的利?了Redis键值数据库提供的?系列优势,在Java实??具包中常?接?的基础上,为使?者提供了?系列具有分布式特性的常??具类。使得原本作为协调单机多线程并发程序的?具包获得了协调分布式多机多线程并发系统的能?,??降低了设计和研发?规模分布式系统的难度。同时结合各富特?的分布式服务,更进?步简化了分布式环境中程序相互之间的协作。
使用SpringBoot集成
首先引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>
然后配置如下:
redisson:
addr:
singleAddr:
host: redis://47.96.11.185:6370
password: 12345678
database: 0
配置RedissionClient:
@Configuration
public class RedissonConfig {
@Value("${redisson.addr.singleAddr.host}")
private String host;
@Value("${redisson.addr.singleAddr.password}")
private String password;
@Value("${redisson.addr.singleAddr.database}")
private int database;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().
setAddress(host).
setPassword(password).
setDatabase(database);
return Redisson.create(config);
}
}
其他使用配置
单机连接配置参考上面即可
集群连接配置如下:
redisson:
addr:
cluster:
host: redis://47.96.11.185:6370, ... ,redis://47.96.11.185:6373
password: 12345678
客户端配置:
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useClusterServers()
.addNodeAddress(hosts.split("[,]"))
.setPassword(password)
.setScanInterval(2000)
.setMasterConnectionPoolSize(10000)
.setSlaveConnectionPoolSize(10000);
return Redisson.create(config);
}
主从配置如下:
redisson:
addr:
masterAndSlave:
masterhost: redis://47.96.11.185:6370
slavehosts: redis://47.96.11.185:6371,redis://47.96.11.185:6372
password: 12345678
database: 0
客户端配置:
@Configuration
public class RedissonConfig3 {
@Value("${redisson.addr.masterAndSlave.masterhost}")
private String masterhost;
@Value("${redisson.addr.masterAndSlave.slavehosts}")
private String slavehosts;
@Value("${redisson.addr.masterAndSlave.password}")
private String password;
@Value("${redisson.addr.masterAndSlave.database}")
private int database;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useMasterSlaveServers()
.setMasterAddress(masterhost)
.addSlaveAddress(slavehosts.split("[,]"))
.setPassword(password)
.setDatabase(database)
.setMasterConnectionPoolSize(10000)
.setSlaveConnectionPoolSize(10000);
return Redisson.create(config);
}
至此几种Redis的模式都涉及到了
Redission使用‘
- 获取公平锁和非公平锁
RLock lock = redissonClient.getFairLock(skuId);
RLock lock = redissonClient.getLock(skuId);
- 加锁——阻塞和非阻塞锁
lock.lock();
lock.lock(20,TimeUnit.SECONDS);
boolean b = lock.tryLock(3,TimeUnit.SECONDS);
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);
lock.unlock();
- 公平非阻塞锁
RLock lock = redissonClient.getFairLock(skuId);
boolean b = lock.tryLock(3,20,TimeUnit.SECONDS);
至此几种基本的锁使用都涉及到了
上面业务代码的优化
首先先抽象出这个业务结构:
HashMap map = null;
加锁
try{
if(isLock){
校验库存
if(库存充?){
保存订单
保存快照
修改库存
删除购物?
map = new HashMap();
...
}
}
}catch(Exception e){
e.printStackTrace();
}finally{
释放锁
}
return map;
在上面的业务代码中就可以优化成这个写法
public Map<String,String> addOrder(String cids, Orders order) {
Map<String,String> map = null;
String[] split = cids.split(",");
List<Integer> list = new ArrayList<>();
for (String s : split) {
list.add(Integer.parseInt(s));
}
List<ShoppingCartVO> shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
boolean isLock = true;
String skuIds[] = new String[shoppingCartVOS.size()];
Map<String,RLock> locks = new HashMap<>();
for (int i = 0; i < shoppingCartVOS.size(); i++) {
String skuId = shoppingCartVOS.get(i).getSkuId();
try {
RLock lock = redissonClient.getFairLock(skuId);
boolean ifAbsent = false;
ifAbsent = lock.tryLock(3,20, TimeUnit.SECONDS);
if (ifAbsent){
skuIds[i] = skuId;
locks.put(skuId,lock);
}
isLock = isLock && ifAbsent;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(isLock){
try {
shoppingCartVOS = shoppingCartMapper.selectShopCartByCids(list);
boolean stockFlag = true;
String untitled = "";
for (ShoppingCartVO cartVO : shoppingCartVOS) {
if(Integer.parseInt(cartVO.getCartNum()) > cartVO.getSkuStock()){
stockFlag = false;
break;
}
untitled += cartVO.getProductName() + ",";
}
if(stockFlag){
map = new HashMap<>();
map.put("orderId",orderId);
map.put("productNames",untitled);
}
}catch (Exception e){
e.printStackTrace();
}finally {
for (int i = 0; i < skuIds.length; i++) {
String skuId = skuIds[i];
if (skuId != null && !"".equals(skuId)){
locks.get(skuId).unlock();
System.out.println("-----------------------unlock");
}
}
}
}
return map;
}
|