IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 27.订单服务-RabbitMQ -> 正文阅读

[大数据]27.订单服务-RabbitMQ

RabbitMQ延时队列--实现定时任务

消息的TTL:消息的存活时间

RabbitMQ可以分别对队列和消息设置TTL:

  • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做设置,超过了这个时间,我们就认为消息死了,称之为死信
  • 如果队列设置了,消息也设置了,会取小的时间,所以一个消息如果被路由到不同的队列中,这个消息的死亡时间有可能不一样(因为队列的TTL不一致)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键,可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者都是一样的效果。

如果一个消息在满足如下条件下,会进死信路由:

  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用
  • 上面消息的TTL到了,消息过期了
  • 队列的长度被限制满了,排在前面的消息被丢弃或者扔到死信路由上

我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息被路由指定到某一个交换机,结合二者,就可以实现一个延时队列,总结来说我们就是要使用死掉的消息来完成延时队列。

一般设定队列的TTL来完成延时队列功能,如果设置消息延时的话,假如有三个消息分别是5分钟,1分钟,2分钟,由于RMQ的惰性检查机制,他检查第一个消息发现是五分钟后过期,服务器就会五分钟之后再过来取走消息,导致后面两个短时间的消息都要五分钟后才能取出来

订单模块具体延时流程(一旦队列创建,更改设置无法更新,必须先删除了之后再进行重新创建)

?测试流程可用性:

订单模块创建交换机,绑定关系,队列,消费者

package com.wuyimin.gulimall.order.config;
/**
 * @ Author wuyimin
 * @ Date 2021/8/29-15:59
 * @ Description
 */
@Configuration
public class MyMQConfig {
    @RabbitListener(queues = "order.release.order.queue")//消费者
    public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {
        System.out.println("收到过期的订单信息:准备关闭订单"+orderEntity);
        //手动签收消息(拿到原生消息,选择不批量告诉)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
    //创建绑定关系,队列和交换机的便捷方式
    @Bean
    public Queue orderDelayQueue(){
        HashMap<String, Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange","order-event-exchange");//死信路由
        map.put("x-dead-letter-routing-key","order.release.order");//死信的路由键
        map.put("x-message-ttl",60000);//消息过期时间一分钟
        //队列名字,是否持久化,是否排他(只能被一个连接使用),是否自动删除
        return new Queue("order.delay.queue",true,false,false,map);
    }
    @Bean
    public Queue orderReleaseOrderQueue(){
        return new Queue("order.release.order.queue",true,false,false);
    }
    @Bean
    public Exchange orderEventExchange(){
        //名字,是否持久化,是否自动删除 Topic交换机可以绑定多个队列
        return new TopicExchange("order-event-exchange",true,false);
    }
    @Bean
    //两个绑定关系
    public Binding orderCreateOrder(){
        return new Binding("order.delay.queue", Binding.DestinationType.QUEUE,
                "order-event-exchange","order.create.order",null);
    }
    @Bean
    public Binding orderReleaseOrder(){
        return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE,
                "order-event-exchange","order.release.order",null);
    }
}

创建测试接口

package com.wuyimin.gulimall.order.web;

/**
 * @ Author wuyimin
 * @ Date 2021/8/26-9:53
 * @ Description
 */
@Controller
public class HelloController {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @ResponseBody
    @GetMapping("/test/createorder")
    public String createOrderTest(){
        OrderEntity orderEntity=new OrderEntity();
        orderEntity.setOrderSn(UUID.randomUUID().toString());
        orderEntity.setModifyTime(new Date());
        //给mq发送消息
        rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",orderEntity);
        return "ok";
    }
}

测试结果

创建业务交换机和队列

库存模块延时具体流程:

库存模块整合MQ

导入依赖:

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置文件:

应用注解:

?消息转换Json配置,创建交换机,队列,绑定关系

package com.wuyimin.gulimall.ware.config;

/**
 * @ Author wuyimin
 * @ Date 2021/8/25-18:58
 * @ Description 返回Json配置
 */
@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();//返回消息转成json
    }
    //交换机
    @Bean
    public Exchange stockEventExchange(){
        return new TopicExchange("stock-event-exchange",true,false,null);
    }
    //普通队列用于解锁库存
    @Bean
    public Queue stockReleaseStockQueue(){
        return new Queue("stock.release.stock.queue",true,false,false,null);
    }
    //延迟队列
    @Bean
    public Queue stockDelayQueue(){
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "stock-event-exchange");
        arguments.put("x-dead-letter-routing-key", "stock.release");
        // 消息过期时间 2分钟
        arguments.put("x-message-ttl", 120000);
        return new Queue("stock.delay.queue",true,false,false,arguments);
    }
    /**
     * 交换机和延迟队列绑定
     */
    @Bean
    public Binding stockLockedBinding() {
        return new Binding("stock.delay.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.locked",
                null);
    }

    /**
     * 交换机和普通队列绑定
     */
    @Bean
    public Binding stockReleaseBinding() {
        return new Binding("stock.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "stock-event-exchange",
                "stock.release.#",
                null);
    }
}

?随便创建一个消费者,然后就可以看到创建好的队列和交换机了

库存的自动解锁功能

库存解锁的场景

1.下订单成功,订单过期没有支付被系统自动取消,被用户手动取消

2.下订单成功,库存锁定成功,但是接下来的业务调用失败导致订单回滚,之前锁定的库存就要自动解锁

创建一个库存系统发送库存锁定消息的To(id其实并没有用到。。。)

package com.wuyimin.common.to.mq;
/**
 * @ Author wuyimin
 * @ Date 2021/8/29-19:08
 * @ Description 库存锁定成功的To
 */
@Data
public class StockLockedTo {
private Long id;
private StockDetailTo detailTo;//工作单详情的所有id
}
package com.wuyimin.common.to.mq;

/**
 * @ Author wuyimin
 * @ Date 2021/8/29-19:18
 * @ Description
 */
@Data
public class StockDetailTo {
    private Long id;
    /**
     * sku_id
     */
    private Long skuId;
    /**
     * sku_name
     */
    private String skuName;
    /**
     * 购买个数
     */
    private Integer skuNum;
    /**
     * 工作单id
     */
    private Long taskId;
    /**
     * 仓库id
     */
    private Long wareId;
    /**
     * 1-已锁定  2-已解锁  3-扣减
     */
    private Integer lockStatus;
}

记得之前我们为order服务配置了一个拦截器,导致其他服务远程调用order服务会被拦截(因为需要登录),这显然是不合理的,所以我们方向远程调用

package com.wuyimin.gulimall.order.interceptor;
/**
 * @ Author wuyimin
 * @ Date 2021/8/26-10:52
 * @ Description 拦截未登录用户
 */
@Component //放入容器中
public class LoginUserInterceptor implements HandlerInterceptor {
    public static ThreadLocal<MemberRespVo> loginUser=new ThreadLocal<>();//方便其他请求拿到
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        String uri=request.getRequestURI();
        //放行远程调用,匹配/order/order/status/**的uri直接放行
        boolean match = new AntPathMatcher().match("/order/order/status/**", uri);
        if(match){
            return true;
        }
        HttpSession session = request.getSession();//获取session
        MemberRespVo attribute = (MemberRespVo) session.getAttribute(AuthServerConstant.LOGIN_USER);
        if(attribute!=null){
            //已经登录
            loginUser.set(attribute);
            return true;
        }else{
            //给前端用户的提示
            request.getSession().setAttribute("msg","请先进行登录");
            //未登录
            response.sendRedirect("http://auth.gulimall.com/login.html");//重定向到登录页
            return false;
        }
    }
}

来创建一个consumer监听消费消息

package com.wuyimin.gulimall.ware.listener;

/**
 * @ Author wuyimin
 * @ Date 2021/8/29-20:53
 * @ Description
 */

@Service
public class StockReleaseListener {
    @Autowired
    WareSkuService wareSkuService;
    //这里我放在类方法上报错了
    @RabbitListener(queues = "stock.release.stock.queue")//监听队列
    /*
    处理消息的方法(解锁)
     */
    public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
        System.out.println("进入了方法");
        try {
            System.out.println("收到了消息开始处理。。。");
            wareSkuService.handleStockLockedRelease(to);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//不选择批量回复
        } catch (Exception e) {
            System.out.println("拒收了消息。。。");
            //有异常就让他重新回队
           channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
}

对于wareService的处理消息方法

 /*
    处理消息的方法(解锁)
     */
    @Override
    public void handleStockLockedRelease(StockLockedTo to) {
        System.out.println("收到了解锁库存的消息");
        StockDetailTo detailTo = to.getDetailTo();//详细信息
        Long detailToId = detailTo.getId();//具体工作单的id
        /*
        解锁的两种情况:根据我们拿到的To,我们取查询数据库,看是否能得到订单的锁定库存结果
        如果没有:库存锁定失败了,库存回滚,无需解锁,无需操作
        如果有:说明是库存以下的10/0错误,这种时候需要回滚
         */
        WareOrderTaskDetailEntity byId = wareOrderTaskDetailService.getById(detailToId);
        if(byId!=null){
            //解锁--到此库存系统一切正常,但是不确定订单是什么情况
            //订单情况1:没有这个订单(订单回滚),必须解锁
            //订单情况2:有这个订单,要看订单状态(如果是已取消:就可以取解锁库存,其他任何状态都不可以解锁库存)
            Long orderId = to.getId();//拿到订单的id
            WareOrderTaskEntity orderTaskEntity = wareOrderTaskService.getById(orderId);
            String orderSn = orderTaskEntity.getOrderSn();//订单号,我们需要拿着这个订单取查询订单状态
            R r = orderFeignService.getOrderByOrderSn(orderSn);
            if(r.getCode()==0){
                //远程调用成功
                Integer status = r.getData(new TypeReference<Integer>() {
                });
                if(status==null||status==4){
                    //订单已经被取消了,订单不存在, 解锁库存
                    if(byId.getLockStatus()==1){//当前具体工作单必须是未解锁状态才行
                        unlockStock(detailTo.getSkuId(),detailTo.getWareId(),detailTo.getSkuNum(),detailTo.getTaskId());
                    }
                }
            }else{
                //远程服务失败
                throw new RuntimeException("远程服务失败,消息消费失败");
            }
        }
    }

中间的远程调用方法,注意这里不能return null过去,即使没有数据也是一种成功的情况,如果返回null,那么调用方会觉得是远程调用失败了,而不是订单回滚了

    @GetMapping("/status/{orderSn}")
    public R getOrderByOrderSn(@PathVariable("orderSn") String orderSn){
        OrderEntity order_sn = orderService.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
        if(order_sn!=null){
            Integer status = order_sn.getStatus();
            return R.ok().setData(status);
        }
        else return R.ok().setData(null);
    }

解锁库存的方法

   /*
    只要解锁库存的消息失败,一定要告诉服务器此次消息解锁是失败的,启用手动的ack机制
   */
    //解锁的方法
    private void unlockStock(Long skuId, Long wareId, Integer num, Long taskId){
        wareSkuDao.unlockStock(skuId,wareId,num);
        //更新库存工作单的状态
        WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();
        entity.setId(taskId);
        entity.setLockStatus(2);//已经解锁
        wareOrderTaskDetailService.updateById(entity);
    }

unlockStock方法(具体落实到数据库的逻辑)

    <update id="unlockStock">
        update wms_ware_sku set stock_locked=stock_locked-#{num}
        where sku_id=#{skuId} and ware_id=#{wareId}
    </update>

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-30 12:07:06  更:2021-08-30 12:08:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 15:44:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码