目录
一、秒杀流程
二、代码示例
1. 依赖及表结构
2. 秒杀下单
3. 本地处理队列
4. 事件对象
5. 消费者?
6. 异常处理消费者
7. 事件生产者
8. 创建Disruptor
三、参考资料
一、秒杀流程
二、代码示例
1. 依赖及表结构
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
// 商品库存表
CREATE TABLE `t_goods_stock` (
`id` bigint(4) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`goods_id` varchar(16) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '商品ID',
`stock` bigint(255) NULL DEFAULT NULL COMMENT '库存',
`price` decimal(8, 2) NULL DEFAULT NULL COMMENT '单价',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
// 订单表
CREATE TABLE `t_order` (
`id` bigint(64) NOT NULL AUTO_INCREMENT COMMENT '主键',
`user_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户ID',
`goods_id` bigint(20) NULL DEFAULT NULL COMMENT '商品ID',
`number` int(11) NULL DEFAULT NULL COMMENT '购买数量',
`price` decimal(8, 2) NULL DEFAULT NULL COMMENT '单价',
`total` decimal(8, 2) NULL DEFAULT NULL COMMENT '总价',
`create_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建人',
`create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
`update_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '修改人',
`update_time` datetime(0) NULL DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 61686 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
2. 秒杀下单
a. Contoller层
package com.common.instance.test.controller;
import com.common.instance.test.core.Response;
import com.common.instance.test.service.SeckillService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Random;
/**
* @description 秒杀活动
* @author tcm
* @version 1.0.0
* @date 2021/12/23 15:47
**/
@RestController
@RequestMapping("/seckill")
@Api(tags = "秒杀活动")
public class SeckillController {
@Resource
private SeckillService seckillService;
@GetMapping("/isSeckillGoods")
@ApiOperation("用户是否抢到商品")
public Response<Boolean> isSeckillGoods(String goodsId){
// 返回结果
Boolean isSeckill = false;
try {
// 用户是否抢到商品
// 随机用户
String userId = String.valueOf(System.currentTimeMillis());
// 随机商品数量
Random rand = new Random();
Integer num = rand.nextInt(10) + 1;
isSeckill = seckillService.isSeckillGoods(userId, goodsId, num);
} catch (Exception e) {
e.printStackTrace();
return Response.error();
}
return Response.success(isSeckill);
}
}
b. Service层
package com.common.instance.test.service.impl;
import com.common.instance.test.dao.TGoodsStockDao;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.RedisQueue;
import com.common.instance.test.entity.TGoodsStock;
import com.common.instance.test.service.SeckillService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* @description 秒杀活动
* @author tcm
* @version 1.0.0
* @date 2021/12/23 15:49
**/
@Service
public class SeckillServiceImpl implements SeckillService {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Resource
private TGoodsStockDao tGoodsStockDao;
@Override
public Boolean isSeckillGoods(String userId, String goodsId, Integer num) {
// 返回结果
Boolean result = false;
// 商品秒杀数量
Long count = redisTemplate.opsForValue().decrement(RedisQueue.SECKILL_KEY + goodsId, num);
// 查询商品库存,设置值
if (Objects.nonNull(count) && count < 0 ) {
TGoodsStock goodsStock = tGoodsStockDao.selectByPrimaryKey(1L);
if (Objects.nonNull(goodsStock)) {
count = goodsStock.getStock() - num;
redisTemplate.opsForValue().set(RedisQueue.SECKILL_KEY + goodsId, count, 1, TimeUnit.DAYS);
}
}
if (Objects.nonNull(count) && count >= 0) {
// 创建秒杀商品对象事件
SeckillGoodsEvent seckillGoodsEvent = new SeckillGoodsEvent();
seckillGoodsEvent.setUserId(userId);
seckillGoodsEvent.setGoodsId(goodsId);
seckillGoodsEvent.setNum(num);
// 保存到等待队列
redisTemplate.opsForList().leftPush(RedisQueue.WAIT_QUEUE, seckillGoodsEvent);
result = true;
}
return result;
}
}
3. 本地处理队列
a. 定义Redis队列
package com.common.instance.test.disruptor.localQueue;
import org.springframework.stereotype.Component;
/**
* @description Redis队列中
* @author tcm
* @version 1.0.0
* @date 2021/12/23 16:26
**/
public class RedisQueue {
// 商品秒伤数量前缀
public static final String SECKILL_KEY = "SECKILL:GOODS_ID:";
// 商品秒伤数量锁
public static final String SECKILL_LOCK = "SECKILL:LOCK:";
// 等待队列
public static final String WAIT_QUEUE = "SECKILL:QUEUE:WAIT_QUEUE";
// 正在处理队列
public static final String PROCESSING_QUEUE = "SECKILL:QUEUE:PROCESSING_QUEUE";
// 失败队列
public static final String FAILED_QUEUE = "SECKILL:QUEUE:FAILED_QUEUE";
// 镜像队列
public static final String IMAGE_QUEUE = "SECKILL:QUEUE:IMAGE_QUEUE";
}
b. 本地处理队列
package com.common.instance.test.disruptor.localQueue;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @description 本地事件队列
* @author tcm
* @version 1.0.0
* @date 2021/12/24 17:09
**/
@Component
public class MyEventQueue {
@Resource
private RedisTemplate<String, Object> redisTemplate;
// 本地事件队列
private LinkedBlockingQueue<SeckillGoodsEvent> myEventQueue;
// 队列最大元素数量
private int capacity = 100000;
@PostConstruct
public void create(){
createEventQueue();
getWaitQueue();
}
private void createEventQueue(){
myEventQueue = new LinkedBlockingQueue<>();
}
private void getWaitQueue(){
new Thread(new MyThread()).start();
}
public LinkedBlockingQueue<SeckillGoodsEvent> getMyEventQueue() {
return myEventQueue;
}
public void setMyEventQueue(LinkedBlockingQueue<SeckillGoodsEvent> myEventQueue) {
this.myEventQueue = myEventQueue;
}
class MyThread implements Runnable {
@Override
public void run() {
while (true) {
try {
// 判定元素数量是否大于容积
if (myEventQueue.size() < capacity) {
// 等待队列获取秒杀事件
Object o = redisTemplate.opsForList().rightPop(RedisQueue.WAIT_QUEUE);
if (Objects.nonNull(o)) {
String event = JSON.toJSONString(o);
SeckillGoodsEvent seckillGoodsEvent = JSON.parseObject(event, SeckillGoodsEvent.class);
// 添加到本地队列中
myEventQueue.offer(seckillGoodsEvent);
// 添加到正在处理队列中
redisTemplate.opsForList().leftPush(RedisQueue.PROCESSING_QUEUE, seckillGoodsEvent);
// 添加到镜像队列
redisTemplate.opsForList().leftPush(RedisQueue.IMAGE_QUEUE, seckillGoodsEvent);
} else {
Thread.sleep(1 * 1000);
}
} else {
Thread.sleep(1 * 1000);
}
// LogUtil.info("myEventQueue size: " + myEventQueue.size());
} catch (InterruptedException e) {
e.printStackTrace();
continue;
}
}
}
}
}
4. 事件对象
package com.common.instance.test.disruptor.event;
import lombok.Data;
/**
* @description 秒杀商品事件信息
* @author tcm
* @version 1.0.0
* @date 2021/12/23 15:34
**/
@Data
public class SeckillGoodsEvent {
// 用户ID
private String userId;
// 商品ID
private String goodsId;
// 购买商品数量
private Integer num;
// 订单ID
private Long orderId;
}
5. 消费者?
a. 创建订单消费者
package com.common.instance.test.disruptor.consumer;
import com.common.instance.test.dao.TGoodsStockDao;
import com.common.instance.test.dao.TOrderDao;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.entity.TGoodsStock;
import com.common.instance.test.entity.TOrder;
import com.lmax.disruptor.EventHandler;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Date;
/**
* @description 生成订单
* @author tcm
* @version 1.0.0
* @date 2021/12/24 9:49
**/
@Component
public class CreateOrderHandler implements EventHandler<SeckillGoodsEvent> {
@Resource
private TOrderDao tOrderDao;
@Resource
private TGoodsStockDao tGoodsStockDao;
@Override
public void onEvent(SeckillGoodsEvent seckillGoodsEvent, long sequence, boolean endOfBatch) throws Exception {
// 获取商品信息
TGoodsStock tGoodsStock = tGoodsStockDao.selectByPrimaryKey(1L);
// 组装订单对象
TOrder order = createOrder(seckillGoodsEvent, tGoodsStock);
// 保存订单
tOrderDao.insert(order);
// 秒杀活动获取订单ID
seckillGoodsEvent.setOrderId(order.getId());
}
// 组装订单对象
private TOrder createOrder(SeckillGoodsEvent seckillGoodsEvent, TGoodsStock tGoodsStock){
TOrder order = new TOrder();
order.setUserId(seckillGoodsEvent.getUserId());
order.setGoodsId(Long.valueOf(seckillGoodsEvent.getGoodsId()));
order.setNumber(seckillGoodsEvent.getNum());
order.setPrice(tGoodsStock.getPrice());
// 计算总价
BigDecimal total = new BigDecimal("0");
total = total.add(order.getPrice().multiply(new BigDecimal(order.getNumber())));
order.setTotal(total);
Date date = new Date();
order.setCreateBy("admin");
order.setCreateTime(date);
order.setUpdateBy("admin");
order.setUpdateTime(date);
return order;
}
}
b. 发送订单kafka消息
package com.common.instance.test.disruptor.consumer;
import com.alibaba.fastjson.JSONObject;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.lmax.disruptor.EventHandler;
import com.log.util.LogUtil;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
/**
* @description 发送kafka消息
* @author tcm
* @version 1.0.0
* @date 2021/12/24 11:05
**/
@Component
public class KafkaHandler implements EventHandler<SeckillGoodsEvent> {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public static final String TOPIC_TEST = "SeckillGoodsEvent";
@Override
public void onEvent(SeckillGoodsEvent seckillGoodsEvent, long sequence, boolean endOfBatch) throws Exception {
send(seckillGoodsEvent);
}
// 发送kafka消息
public void send(SeckillGoodsEvent seckillGoodsEvent) {
String obj2String = JSONObject.toJSONString(seckillGoodsEvent);
// 发送消息
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_TEST, obj2String);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
LogUtil.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> stringObjectSendResult) {
//成功的处理
LogUtil.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
}
c. 扣减库存
package com.common.instance.test.disruptor.consumer;
import com.common.instance.test.dao.TGoodsStockDao;
import com.common.instance.test.dao.TOrderDao;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.entity.TGoodsStock;
import com.common.instance.test.entity.TOrder;
import com.lmax.disruptor.EventHandler;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Objects;
/**
* @description 扣减库存
* @author tcm
* @version 1.0.0
* @date 2021/12/24 11:08
**/
@Component
public class ReduceStockHandler implements EventHandler<SeckillGoodsEvent> {
@Resource
private TOrderDao tOrderDao;
@Resource
private TGoodsStockDao tGoodsStockDao;
@Override
public void onEvent(SeckillGoodsEvent event, long sequence, boolean endOfBatch) throws Exception {
// 获取秒杀订单
TOrder order = tOrderDao.selectByPrimaryKey(event.getOrderId());
// 获取商品库存
TGoodsStock goodsStock = tGoodsStockDao.selectByPrimaryKey(1L);
if (Objects.nonNull(order) && Objects.nonNull(goodsStock)){
// 计算库存
Long stock = goodsStock.getStock() - order.getNumber();
goodsStock.setStock(stock);
// 更新库存
tGoodsStockDao.updateByPrimaryKey(goodsStock);
}
}
}
d. 移除正处理队列元素
package com.common.instance.test.disruptor.consumer;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.RedisQueue;
import com.lmax.disruptor.*;
import com.log.util.LogUtil;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @description 所有消费处理完,正在处理的秒杀事件移除
* @author TCM
* @version 1.0
* @date 2021/12/25 17:14
**/
@Component
public class RemoveProcessingQueueHandler implements EventHandler<SeckillGoodsEvent> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onEvent(SeckillGoodsEvent seckillGoodsEvent, long l, boolean b) throws Exception {
// 事件处理完成后,从正在处理队列中移除
Object event = redisTemplate.opsForList().rightPop(RedisQueue.PROCESSING_QUEUE);
LogUtil.info( l + "removeProcessing: " + JSON.toJSONString(event));
}
}
6. 异常处理消费者
package com.common.instance.test.disruptor.consumer;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.RedisQueue;
import com.lmax.disruptor.ExceptionHandler;
import com.log.util.LogUtil;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @description Disruptor异常捕捉
* @author TCM
* @version 1.0
* @date 2021/12/25 19:39
**/
@Component
public class MyExceptionHandler implements ExceptionHandler<SeckillGoodsEvent> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public void handleEventException(Throwable ex, long sequence, SeckillGoodsEvent seckillGoodsEvent) {
try {
// 移除正在处理队列事件
redisTemplate.opsForList().rightPop(RedisQueue.PROCESSING_QUEUE);
// 添加到失败队列事件
redisTemplate.opsForList().leftPush(RedisQueue.FAILED_QUEUE, seckillGoodsEvent);
// 记录处理异常
LogUtil.error("MyExceptionHandler.handleEventException()", (Exception) ex);
} catch (Exception e) {
LogUtil.error("MyExceptionHandler.handleEventException()",
"PROCESSING_QUEUE into FAILED_QUEUE failed: " + JSON.toJSONString(seckillGoodsEvent),
e);
}
}
@Override
public void handleOnStartException(Throwable ex) {
LogUtil.error("MyExceptionHandler.handleOnStartException()", (Exception) ex);
}
@Override
public void handleOnShutdownException(Throwable ex) {
LogUtil.error("MyExceptionHandler.handleOnShutdownException()", (Exception) ex);
}
}
7. 事件生产者
package com.common.instance.test.disruptor.publisher;
import com.alibaba.fastjson.JSON;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.MyEventQueue;
import com.lmax.disruptor.EventTranslator;
import com.log.util.LogUtil;
import java.util.Objects;
/**
* @description 将事件发送到RingBuffer
* @author TCM
* @version 1.0
* @date 2021/12/24 22:51
**/
public class MyEventTranslator implements EventTranslator<SeckillGoodsEvent> {
// 本地事件队列
private MyEventQueue myEventQueue;
public MyEventTranslator(MyEventQueue myEventQueue){
this.myEventQueue = myEventQueue;
}
@Override
public void translateTo(SeckillGoodsEvent seckillGoodsEvent, long l) {
// 从本地队列获取事件
SeckillGoodsEvent event = myEventQueue.getMyEventQueue().poll();
// 事件属性值赋值
if (Objects.nonNull(event)){
seckillGoodsEvent.setUserId(event.getUserId());
seckillGoodsEvent.setGoodsId(event.getGoodsId());
seckillGoodsEvent.setNum(event.getNum());
}
LogUtil.info(l + " seckillGoodsEvent: " + JSON.toJSONString(seckillGoodsEvent));
}
}
8. 创建Disruptor
package com.common.instance.test.disruptor;
import com.common.instance.test.disruptor.consumer.*;
import com.common.instance.test.disruptor.event.SeckillGoodsEvent;
import com.common.instance.test.disruptor.localQueue.MyEventQueue;
import com.common.instance.test.disruptor.publisher.MyEventTranslator;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.log.util.LogUtil;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.Executors;
/**
* @description 创建Disruptor并启动
* @author TCM
* @version 1.0
* @date 2021/12/24 21:20
**/
@Component
public class SeckillDisruptor {
// 消费者:所有Handler处理异常记录
@Resource
private MyExceptionHandler myExceptionHandler;
// 消费者:创建订单
@Resource
private CreateOrderHandler createOrderHandler;
// 消费者:发送kafka
@Resource
private KafkaHandler kafkaHandler;
// 消费者:扣减库存
@Resource
private ReduceStockHandler reduceStockHandler;
// 消费者:移除正在处理的秒杀事件
@Resource
private RemoveProcessingQueueHandler removeProcessingQueueHandler;
@Resource
private MyEventQueue myEventQueue;
private Disruptor<SeckillGoodsEvent> disruptor;
@PostConstruct
public void init(){
LogUtil.info("SeckillDisruptor init");
// 创建Disruptor
disruptor = createDisruptor();
// 启动
new Thread(new StartDisruptorThread()).start();
}
// 创建Disruptor
public Disruptor<SeckillGoodsEvent> createDisruptor(){
// 创建Disruptor
Disruptor<SeckillGoodsEvent> disruptor = new Disruptor<SeckillGoodsEvent>(
// 预填充RingBuffer
new EventFactory(){
@Override
public Object newInstance() {
return new SeckillGoodsEvent();
}
},
// RingBuffer的大小(必须为2的N次方)
4096,
// 线程池处理
Executors.newWorkStealingPool(256),
// 生成类型:SINGLE(单生产者)、MULTI(多生产者)
ProducerType.SINGLE,
// 等待策略
new BlockingWaitStrategy());
// 消费异常处理
disruptor.setDefaultExceptionHandler(myExceptionHandler);
// 消费者
EventHandlerGroup<SeckillGoodsEvent> handlerGroup = disruptor.handleEventsWith(createOrderHandler);
handlerGroup.then(kafkaHandler, reduceStockHandler);
handlerGroup.then(removeProcessingQueueHandler);
return disruptor;
}
class StartDisruptorThread implements Runnable {
@Override
public void run() {
disruptor.start();
while (true) {
try {
// 本地队列没有秒杀事件,休眠50ms
if (myEventQueue.getMyEventQueue().isEmpty()) {
// LogUtil.info("本地事件队列没有可处理秒杀事件" + Thread.currentThread().getName());
Thread.sleep(1 * 50);
continue;
}
// 生产者
MyEventTranslator myEventTranslator = new MyEventTranslator(myEventQueue);
disruptor.publishEvent(myEventTranslator);
} catch (Exception e) {
e.printStackTrace();
continue;
}
}
}
}
}
三、参考资料
Disruptor 系列(二)使用场景 - 走看看
disruptor模拟高速处理大规模订单类业务场景_congge-CSDN博客
SpringBoot - 并发框架Disruptor使用详解2(多生产者、多消费者、消费者依赖关系)
使用Disruptor完成多个消费者不重复消费消息_tianyaleixiaowu的专栏-CSDN博客_disruptor 多消费者
|