Redis实战:黑马点评之优惠券秒杀
1 全局唯一ID
1.1全局唯一ID
每个店铺都可以发布优惠券:
当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
全局ID生成器,是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
常见的全局唯一ID的生成策略有:
- UUID:16进制的字符串ID,可以做唯一ID,但不支持自增
- Redis 自增
- snowflake 雪花算法:long 类型的64ID,性能更好,但是比较依赖于时钟,如果时间不准确,可能会出现异常问题
- 数据库自增:单独创建一张表,在表中记录自增的数值
这里我们使用redis来完成全局ID生成器的制作,因为redis可以很容易的满足以上特性:
- 唯一性:redis是独立于数据库之外的,因此我们不用担心不同数据库中id重复的问题
- 高可用:可以通过部署redis集群实现高可用
- 高性能:redis是基于内存存储的,性能比较高
- 递增性:redis的String类型中有incrby命令,我们可以用其来实现id的递增
- 安全性:为了增加ID的安全性,我们可以不直接通过Redis自增来生成id,而是在自增的同时拼接一些其它信息:
通过redis生成的id组成结构如下:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
1.2 Redis实现全局唯一Id
工具类代码如下:
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1663491103L;
private static final int COUNT_BITS = 32;
@Autowired
private StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix) {
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
return timestamp << COUNT_BITS | count;
}
}
我们可以根据以下代码来模拟多线程环境下id的生成速度
@Test
void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
long id = redisIdWorker.nextId("order");
System.out.println("id = " + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time = " + (end - begin));
}
在这里我们用到了countdownlatch,countdownlatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题
我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch
CountDownLatch 中有两个最重要的方法:countDown和await
await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。
测试结束后,我们可以看到redis中生成的计数信息,说明我们通过刚刚的测试生成了30000个id
3 添加优惠卷
每个店铺都可以发布优惠券,分为平价券和特价券。平价券可以任意购买,而特价券需要秒杀抢购:
tb_voucher:优惠券的基本信息,优惠金额、使用规则等 tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
关于以上两个表,有几个点需要注意:
- 平价券只需要在tb_voucher中保存信息即可,而特价券不仅需要在tb_voucher中保存信息,还需要在tb_seckill_voucher保存信息
- 在tb_voucher中通过type字段来区分一个优惠券是平价券还是特价券。
- tb_seckill_voucher的主键使用的是特价券在tb_voucher中的id
关于新增优惠券的代码,在基础代码中已经完成了,我们只需简单阅读一下即可
**新增普通卷代码: **VoucherController
@PostMapping
public Result addVoucher(@RequestBody Voucher voucher) {
voucherService.save(voucher);
return Result.ok(voucher.getId());
}
新增秒杀卷代码:
VoucherController
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
voucherService.addSeckillVoucher(voucher);
return Result.ok(voucher.getId());
}
VoucherServiceImpl
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
save(voucher);
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
我们可以通过postman,往表中插入一张秒杀券,以便后续功能的测试,注意这里的beginTime和endTime一定要改在当前时间之后,否则前端页面会无法显示
{
"shopId": 1,
"title": "100元代金券",
"subTitle": "周一到周日均可使用",
"rules": "全场通用\\n无需预约\\n可无限叠加\\不兑现、不找零\\n仅限堂食",
"payValue": 8000,
"actualValue": 10000,
"type": 1,
"stock": 100,
"beginTime":"2022-09-18T17:42:00",
"endTime":"2022-09-19T23:40:00"
}
4 实现秒杀下单基本代码
下单核心思路:当我们点击抢购时,会触发右侧的请求,我们只需要编写对应的controller即可
秒杀下单时需要判断两点:
- 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
- 库存是否充足,不足则无法下单
下单核心逻辑分析:
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件,比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id,如果有一个条件不满足则直接结束。
VoucherOrderController代码编写如下:
@RestController
@RequestMapping("/voucher-order")
public class VoucherOrderController {
@Autowired
private IVoucherOrderService voucherService;
@PostMapping("seckill/{id}")
public Result seckillVoucher(@PathVariable("id") Long voucherId) {
return voucherService.seckillVoucher(voucherId);
}
}
VoucherOrderServiceImpl代码编写如下:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
if(seckillVoucher == null){
return Result.fail("秒杀券不存在");
}
LocalDateTime now = LocalDateTime.now();
if(now.isBefore(seckillVoucher.getBeginTime())||now.isAfter(seckillVoucher.getEndTime())){
return Result.fail("不在活动时间内");
}
if(seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
5 库存超卖问题
5.1库存超卖问题分析
关于库存数量的判断和库存的扣减,在我们原有代码中是这么写的:
if (voucher.getStock() < 1) {
return Result.fail("库存不足!");
}
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
return Result.fail("库存不足!");
}
这样其实是有线程安全问题的,假设当库存数量只有1时,线程1执行查询库存,判断库存大于0,于是准备去扣减库存,但是在扣减库存之前,线程2也执行了查询库存的操作,也发现库存大于零,那么这两个线程最终都会去扣减库存,而此时库存中商品的数量只有1,此时就会出现库存的超卖问题,即库存变成负数
超卖问题是典型的多线程安全问题,针对这一问题的常见解决方案就是加锁:而对于加锁,我们通常有两种解决方案:见下图:
这里我们使用乐观锁解决库存超卖问题,而乐观锁也有以下两种实现方式:
-
版本号法 :所谓版本号法就是在数据表中新增一个version字段,这个字段的值就是该行数据的版本号,当我们执行修改操作时,让版本号+1,这样,在多线程并发的时候,我们就可以基于版本号来判断数据有没有被修改过 例如,此时有线程1和线程2两个线程同时来访问库存,线程1先执行,在查询库存时会将库存和版本号一并查询出来,假设此时库存和版本号都为1,当线程1判断库存大于0并准备去执行扣减操作时,线程2开始执行了,同样的,线程2也会去查询库存和版本号,得到的结果也都为1,当线程2准备去执行扣减操作时,线程1的扣减操作已经开始执行了,线程1在执行扣减操作时会判断此时的版本号是否与之前查询出来的版本号一致,即版本号是否为1,经过判断发现是一致的,线程1就会开始执行扣减操作,库存减一,同时版本号加一。当线程2开始执行扣减操作时,也会去判断此时的版本号是否与之前查询出来的一致,之前线程2查询出来的版本号是1,但此时版本号已经变成2了,线程2的更新操作就会失败,这样也就解决了超卖的问题。
-
CAS法:CAS 法,即比较和替换法,是在版本号法的基础上改进而来 以我们当前的业务为例,我们发现库存和版本号是同时查而且同时发生变化的,当我们查询库存时会将版本号一并查询出来,而当库存减一时版本号也会加一,这种情况下,我们就可以用库存数量代替版本号来判断当前数据是否发生变化 例如,此时有线程1和线程2两个线程同时来访问库存,线程1先执行,在查询库存时只将库存查询出来,假设此时库存为1,当线程1判断库存大于0并准备去执行扣减操作时,线程2开始执行了,同样的,线程2也会去查询库存,得到的结果也是1,当线程2准备去执行扣减操作时,线程1的扣减操作已经开始执行了,线程1在执行扣减操作时会判断此时的库存数量是否与之前查询出来的库存数量一致,即是否为1,经过判断发现是一致的,线程1就会开始执行扣减操作,库存减一。当线程2开始执行扣减操作时,也会去判断此时的库存数量是否与之前查询出来的一致,之前线程2查询出来的库存数量是1,但此时版本号已经变成0了,线程2的更新操作就会失败,这样也就解决了超卖的问题。
5.2 乐观锁解决超卖问题
修改代码方案一
这种方案是基于CAS法实现的,我们可以将VoucherOrderServiceImpl 在扣减库存时执行的sql语句改写成:
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId)
.eq("stock",seckillVoucher.getStock()).update();
以上逻辑的核心含义是:只要扣减库存时的库存和之前查询出来的库存是一致的,就意味着没有其他线程修改过库存,那么此时就是线程安全的
以上这种方式虽然保证了库存不会超卖,但是库存充足的情况下会出现许多扣减库存失败的情况,失败的原因在于:如果在同一时间有多个线程拿到了相同数量的库存,那么这些线程中最多只会有一条线程能扣减库存成功,因为只要有一条线程将库存修改了,那么其他所有拿到相同数量库存的线程在进行库存数量的判断时都会发现库存数量已经被修改了,导致这些线程扣减库存执行失败,哪怕此刻库存仍然十分充足
修改代码方案二
基于上述方案失败的经验,我们可以将sql语句改写成:
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).gt("stock",0).update();
也就是说,只要stock > 0,就允许线程修改库存,由于这里的判断是交给数据库进行的,而数据库在执行更新操作时是会为数据加上行锁的,因此就不用担心会发生并发问题。
6 一人一单问题
发行优惠券的目的是为了引流,但是目前的情况是,每个人都可以无限制的对优惠券进行抢购,所以我们应该修改一下当前的业务逻辑,让一个用户只能对同一个优惠券下单一次。
具体业务逻辑如下:
在VoucherOrderServiceImpl中修改代码:
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
if(seckillVoucher == null){
return Result.fail("秒杀券不存在");
}
LocalDateTime now = LocalDateTime.now();
if(now.isBefore(seckillVoucher.getBeginTime())||now.isAfter(seckillVoucher.getEndTime())){
return Result.fail("不在活动时间内");
}
if(seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
Integer count = query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("已经购买过了!");
}
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
上述代码在多线程环境下也可能会出现线程安全问题,实际上,以下代码
Integer count = query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("已经购买过了!");
}
与库存超卖的原因类似,这种判断在多线程环境下几乎趋近于摆设,一个用户同样能够完成多次下单,因此我们需要对这些代码进行优化,在解决库存超卖问题时,我们使用的是乐观锁,但在这里由于是查询操作,因此我们只能选择悲观锁。
初始方案是将下单的操作封装成一个createVoucherOrder方法,同时为了确保线程安全,在方法上添加一把synchronized 锁
@Override
public Result seckillVoucher(Long voucherId) {
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
if(seckillVoucher == null){
return Result.fail("秒杀券不存在");
}
LocalDateTime now = LocalDateTime.now();
if(now.isBefore(seckillVoucher.getBeginTime())||now.isAfter(seckillVoucher.getEndTime())){
return Result.fail("不在活动时间内");
}
if(seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
return createVoucherOrder(voucherId);
}
@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {
Integer count = query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("已经购买过了!");
}
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
但是按照上述方式添加锁,锁的粒度太粗,在使用锁过程中,控制锁粒度是一件非常重要的事情,而直接在方法上添加synchronized会使用当前类对象作为锁对象,由于当前类对象在ioc容器中是单例的,所以在高并发环境下,多个线程共用一把锁,线程串行执行,严重影响效率
那么我们应该使用什么作为锁呢?让我们回归到业务上来分析,由于我们当前希望的是一个用户最多只能下单一次,那么我们就可以使用当前用户id来作为锁,这样就能在控制锁粒度的同时保证线程安全,具体代码如下:
@Transactional
public Result createVoucherOrder(Long voucherId) {
synchronized (UserHolder.getUser().getId().toString().intern()) {
Integer count = query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("已经购买过了!");
}
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if(!success){
return Result.fail("库存不足");
}
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}
}
上述代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果我们在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放,这种情况下其他事务就会抢到锁然后执行方法,这时候由于事务还未提交,当前用户在数据库中仍然是没有订单信息的,此时就会出现重复下单的情况。
为了解决上述问题,我们必须要将锁的范围扩大到整个事务,那这应该怎样操作呢?
我们可以针对seckillVoucher调用createVoucherOrder方法的代码进行加锁,如下所示:
@Override
public Result seckillVoucher(Long voucherId) {
SeckillVoucher seckillVoucher = seckillVoucherService.getById(voucherId);
if(seckillVoucher == null){
return Result.fail("秒杀券不存在");
}
LocalDateTime now = LocalDateTime.now();
if(now.isBefore(seckillVoucher.getBeginTime())||now.isAfter(seckillVoucher.getEndTime())){
return Result.fail("不在活动时间内");
}
if(seckillVoucher.getStock() < 1){
return Result.fail("库存不足");
}
synchronized (UserHolder.getUser().getId().toString().intern()) {
return createVoucherOrder(voucherId);
}
}
但是以上做法仍然有问题,由于我们在调用createVoucherOrder方法时,调用者是this,而我们知道spring控制事务的原理是通过创建类的代理对象来调用方法,而在这里方法的调用者并非是代理对象,因此会出现事务失效的情况。
这里我们的解决方案是获取当前类的父接口的代理对象,并由代理对象来执行createVoucherOrder方法
synchronized (UserHolder.getUser().getId().toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
为了上述代码的正常运行,我们还需要做几件事
在IVoucherOrderService接口中创建createVoucherOrder方法:
public interface IVoucherOrderService extends IService<VoucherOrder> {
Result seckillVoucher(Long voucherId);
Result createVoucherOrder(Long voucherId);
}
在pom文件中导入一下依赖:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
在启动类上打上注解@EnableAspectJAutoProxy
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
@EnableAspectJAutoProxy(exposeProxy = true)
public class HmDianPingApplication {
public static void main(String[] args) {
SpringApplication.run(HmDianPingApplication.class, args);
}
}
这下就没问题了
7 集群环境下的并发问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
1.我们将服务启动两份,端口分别为8081和8082:
2.然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
3.重新加载nginx,命令为nginx.exe -s reload
4.经过测试,最终发现在集群模式下,有多少个服务,用户最多就能下多少单,也就是说在集群模式下,我们之前使用的悲观锁失效了
为什么会出现上述现象呢?原因是由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,而每个jvm内部有一个锁监视器,用来记录当前获取锁的线程id与锁对象,假设在服务器A的tomcat内部,有两个线程分别为线程1、线程2,这两个线程由于在同一个jvm上运行,且锁对象是同一个,当线程1获取锁对象时就会被锁监视器记录下来,此时线程2就无法获取锁对象了,这样也就实现了互斥锁。
但是如果现在是服务器B的tomcat内部,又有两个线程分别为线程3、线程4,这两个线程虽然和线程1、线程2运行着同样的代码,但是却是在不同的jvm上运行的,这也就意味着它们拥有不同的锁监视器和不同的锁对象,即便在第一个jvm中,锁监视器已经获取到了线程1的id,但是在第二个jvm中,锁监视器仍然是空的,这也就意味着线程3和线程4都能去获得锁,这样的话,在代码中看起来好像只有一把锁,实际上在不同的服务器上却是不同的锁,线程3与线程4能实现互斥,但是却无法和线程1与线程2实现互斥,这就是集群环境下,syn锁失效的原因。
在这种情况下,我们就需要使用分布式锁来解决这个问题。
|