RabbitMQ延时队列--实现定时任务
消息的TTL:消息的存活时间
RabbitMQ可以分别对队列和消息设置TTL:
- 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做设置,超过了这个时间,我们就认为消息死了,称之为死信
- 如果队列设置了,消息也设置了,会取小的时间,所以一个消息如果被路由到不同的队列中,这个消息的死亡时间有可能不一样(因为队列的TTL不一致)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键,可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者都是一样的效果。
如果一个消息在满足如下条件下,会进死信路由:
- 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用
- 上面消息的TTL到了,消息过期了
- 队列的长度被限制满了,排在前面的消息被丢弃或者扔到死信路由上
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由指定到某一个交换机,结合二者,就可以实现一个延时队列,总结来说我们就是要使用死掉的消息来完成延时队列。
一般设定队列的TTL来完成延时队列功能,如果设置消息延时的话,假如有三个消息分别是5分钟,1分钟,2分钟,由于RMQ的惰性检查机制,他检查第一个消息发现是五分钟后过期,服务器就会五分钟之后再过来取走消息,导致后面两个短时间的消息都要五分钟后才能取出来
订单模块具体延时流程(一旦队列创建,更改设置无法更新,必须先删除了之后再进行重新创建)
?测试流程可用性:
订单模块创建交换机,绑定关系,队列,消费者
package com.wuyimin.gulimall.order.config;
/**
* @ Author wuyimin
* @ Date 2021/8/29-15:59
* @ Description
*/
@Configuration
public class MyMQConfig {
@RabbitListener(queues = "order.release.order.queue")//消费者
public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到过期的订单信息:准备关闭订单"+orderEntity);
//手动签收消息(拿到原生消息,选择不批量告诉)
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
//创建绑定关系,队列和交换机的便捷方式
@Bean
public Queue orderDelayQueue(){
HashMap<String, Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","order-event-exchange");//死信路由
map.put("x-dead-letter-routing-key","order.release.order");//死信的路由键
map.put("x-message-ttl",60000);//消息过期时间一分钟
//队列名字,是否持久化,是否排他(只能被一个连接使用),是否自动删除
return new Queue("order.delay.queue",true,false,false,map);
}
@Bean
public Queue orderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false);
}
@Bean
public Exchange orderEventExchange(){
//名字,是否持久化,是否自动删除 Topic交换机可以绑定多个队列
return new TopicExchange("order-event-exchange",true,false);
}
@Bean
//两个绑定关系
public Binding orderCreateOrder(){
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,
"order-event-exchange","order.create.order",null);
}
@Bean
public Binding orderReleaseOrder(){
return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,
"order-event-exchange","order.release.order",null);
}
}
创建测试接口
package com.wuyimin.gulimall.order.web;
/**
* @ Author wuyimin
* @ Date 2021/8/26-9:53
* @ Description
*/
@Controller
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
@ResponseBody
@GetMapping("/test/createorder")
public String createOrderTest(){
OrderEntity orderEntity=new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString());
orderEntity.setModifyTime(new Date());
//给mq发送消息
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
return "ok";
}
}
测试结果
创建业务交换机和队列
库存模块延时具体流程:
库存模块整合MQ
导入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件:
应用注解:
?消息转换Json配置,创建交换机,队列,绑定关系
package com.wuyimin.gulimall.ware.config;
/**
* @ Author wuyimin
* @ Date 2021/8/25-18:58
* @ Description 返回Json配置
*/
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();//返回消息转成json
}
//交换机
@Bean
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange",true,false,null);
}
//普通队列用于解锁库存
@Bean
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue",true,false,false,null);
}
//延迟队列
@Bean
public Queue stockDelayQueue(){
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 消息过期时间 2分钟
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue",true,false,false,arguments);
}
/**
* 交换机和延迟队列绑定
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
/**
* 交换机和普通队列绑定
*/
@Bean
public Binding stockReleaseBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
}
?随便创建一个消费者,然后就可以看到创建好的队列和交换机了
库存的自动解锁功能
库存解锁的场景
1.下订单成功,订单过期没有支付被系统自动取消,被用户手动取消
2.下订单成功,库存锁定成功,但是接下来的业务调用失败导致订单回滚,之前锁定的库存就要自动解锁
创建一个库存系统发送库存锁定消息的To(id其实并没有用到。。。)
package com.wuyimin.common.to.mq;
/**
* @ Author wuyimin
* @ Date 2021/8/29-19:08
* @ Description 库存锁定成功的To
*/
@Data
public class StockLockedTo {
private Long id;
private StockDetailTo detailTo;//工作单详情的所有id
}
package com.wuyimin.common.to.mq;
/**
* @ Author wuyimin
* @ Date 2021/8/29-19:18
* @ Description
*/
@Data
public class StockDetailTo {
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private Long wareId;
/**
* 1-已锁定 2-已解锁 3-扣减
*/
private Integer lockStatus;
}
记得之前我们为order服务配置了一个拦截器,导致其他服务远程调用order服务会被拦截(因为需要登录),这显然是不合理的,所以我们方向远程调用
package com.wuyimin.gulimall.order.interceptor;
/**
* @ Author wuyimin
* @ Date 2021/8/26-10:52
* @ Description 拦截未登录用户
*/
@Component //放入容器中
public class LoginUserInterceptor implements HandlerInterceptor {
public static ThreadLocal<MemberRespVo> loginUser=new ThreadLocal<>();//方便其他请求拿到
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String uri=request.getRequestURI();
//放行远程调用,匹配/order/order/status/**的uri直接放行
boolean match = new AntPathMatcher().match("/order/order/status/**", uri);
if(match){
return true;
}
HttpSession session = request.getSession();//获取session
MemberRespVo attribute = (MemberRespVo) session.getAttribute(AuthServerConstant.LOGIN_USER);
if(attribute!=null){
//已经登录
loginUser.set(attribute);
return true;
}else{
//给前端用户的提示
request.getSession().setAttribute("msg","请先进行登录");
//未登录
response.sendRedirect("http://auth.gulimall.com/login.html");//重定向到登录页
return false;
}
}
}
来创建一个consumer监听消费消息
package com.wuyimin.gulimall.ware.listener;
/**
* @ Author wuyimin
* @ Date 2021/8/29-20:53
* @ Description
*/
@Service
public class StockReleaseListener {
@Autowired
WareSkuService wareSkuService;
//这里我放在类方法上报错了
@RabbitListener(queues = "stock.release.stock.queue")//监听队列
/*
处理消息的方法(解锁)
*/
public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
System.out.println("进入了方法");
try {
System.out.println("收到了消息开始处理。。。");
wareSkuService.handleStockLockedRelease(to);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//不选择批量回复
} catch (Exception e) {
System.out.println("拒收了消息。。。");
//有异常就让他重新回队
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
对于wareService的处理消息方法
/*
处理消息的方法(解锁)
*/
@Override
public void handleStockLockedRelease(StockLockedTo to) {
System.out.println("收到了解锁库存的消息");
StockDetailTo detailTo = to.getDetailTo();//详细信息
Long detailToId = detailTo.getId();//具体工作单的id
/*
解锁的两种情况:根据我们拿到的To,我们取查询数据库,看是否能得到订单的锁定库存结果
如果没有:库存锁定失败了,库存回滚,无需解锁,无需操作
如果有:说明是库存以下的10/0错误,这种时候需要回滚
*/
WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(detailToId);
if(byId!=null){
//解锁--到此库存系统一切正常,但是不确定订单是什么情况
//订单情况1:没有这个订单(订单回滚),必须解锁
//订单情况2:有这个订单,要看订单状态(如果是已取消:就可以取解锁库存,其他任何状态都不可以解锁库存)
Long orderId = to.getId();//拿到订单的id
WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getById(orderId);
String orderSn = orderTaskEntity.getOrderSn();//订单号,我们需要拿着这个订单取查询订单状态
R r = orderFeignService.getOrderByOrderSn(orderSn);
if(r.getCode()==0){
//远程调用成功
Integer status = r.getData(new TypeReference<Integer>() {
});
if(status==null||status==4){
//订单已经被取消了,订单不存在, 解锁库存
if(byId.getLockStatus()==1){//当前具体工作单必须是未解锁状态才行
unlockStock(detailTo.getSkuId(),detailTo.getWareId(),detailTo.getSkuNum(),detailTo.getTaskId());
}
}
}else{
//远程服务失败
throw new RuntimeException("远程服务失败,消息消费失败");
}
}
}
中间的远程调用方法,注意这里不能return null过去,即使没有数据也是一种成功的情况,如果返回null,那么调用方会觉得是远程调用失败了,而不是订单回滚了
@GetMapping("/status/{orderSn}")
public R getOrderByOrderSn(@PathVariable("orderSn") String orderSn){
OrderEntity order_sn = orderService.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
if(order_sn!=null){
Integer status = order_sn.getStatus();
return R.ok().setData(status);
}
else return R.ok().setData(null);
}
解锁库存的方法
/*
只要解锁库存的消息失败,一定要告诉服务器此次消息解锁是失败的,启用手动的ack机制
*/
//解锁的方法
private void unlockStock(Long skuId, Long wareId, Integer num, Long taskId){
wareSkuDao.unlockStock(skuId,wareId,num);
//更新库存工作单的状态
WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
entity.setId(taskId);
entity.setLockStatus(2);//已经解锁
wareOrderTaskDetailService.updateById(entity);
}
unlockStock方法(具体落实到数据库的逻辑)
<update id="unlockStock">
update wms_ware_sku set stock_locked=stock_locked-#{num}
where sku_id=#{skuId} and ware_id=#{wareId}
</update>
|