IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Java 延迟队列 DelayQueue + “生产者-消费者“模式实现订单延迟处理 -> 正文阅读

[大数据]Java 延迟队列 DelayQueue + “生产者-消费者“模式实现订单延迟处理

??《Java 延迟队列 DelayQueue 的原理》一文,总结了延迟队列的原理,本文结合代码总结下基于延迟队列 DelayQueue ,采用 “生产者-消费者” 模式实现的订单延迟处理功能。
??延迟队列在实际开发中的应用要有几部分:实现了 Delayed 接口的消息体(本文中即为订单对象)、存放消息的延时队列、生产消息的生产者、消费消息的消费者。

一、订单对象

??存放到 DelayQueue 的元素必须实现 Delayed 接口,重写方法 compareTo 和 getDelay。Delayed 接口使对象成为延迟对象,它使存放在 DelayQueue 中的对象具有了延迟处理时间。订单对象(简化版)示例如下。

package com.test.jobs;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class QueueOrder implements Delayed {
    // 订单 ID
    private String orderId;
    // 订单金额
    private Integer orderAt;
    // 订单时间
    private String orderDt;

    // 到期时间,按照这个属性判断延时时长。
    private long endTime;

    // 根据此方法判断延迟任务是否到期。如果返回负数则说明到期,否则未到期
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 比较时间,将最接近执行时间的任务,排在最前面
     */
    @Override
    public int compareTo(Delayed delayed) {
        QueueOrder queueOrder = (QueueOrder) delayed;
        return (int) ((this.endTime - queueOrder.endTime));
    }

    // 直接设置到期时间
    public void setEndTime(long endTime) {
        this.endTime = endTime;
    }
    // 此处省略 orderId、orderAt、orderDt 的 getter 和 setter 方法
}
二、延迟队列

??创建延迟队列,声明订单生产及消费方法,代码(简化版)如下。

package com.test.jobs;

import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.test.jobs.ApplicationContextHelper;
import com.test.jobs.QueueOrder;

import java.util.concurrent.DelayQueue;

public class DelayOrderResource {

    private static Logger logger = LoggerFactory.getLogger(DelayOrderResource.class);

    private DelayQueue<QueueOrder> delayQueue = new DelayQueue<>();

    public OrderService orderService = ApplicationContextHelper.getBean(OrderService.class);

    /**
     * 生产者向延迟队列中添加资源
     */
    public void produce() {
        try {
            // 获取一条待处理订单(此处是简化版的代码,实际项目中可据实获取待处理订单)
            QueueOrder queueOrder = orderService.getOrder();
            // 随便设置一个到期时间(毫秒),实际项目中可据不同场景设置不同的到期时间
            queueOrder.setEndTime(1651660951472l);
            delayQueue.put(queueOrder);
        } catch (Exception e) {
            logger.error("订单生产出现异常", e);
        }
    }

    /**
     * 将延迟队列中的订单移除并处理
     */
    public void consume() {
        try {
            QueueOrder queueOrder = delayQueue.take();
            logger.debug("延迟队列消费者处理订单:[{}]", JSON.toJSONString(queueOrder));
            orderService.dealOrder(queueOrder);
        } catch (Exception e) {
            logger.error("异常", e);
        }
    }
}

??订单处理服务类 OrderService 代码示例如下。

package com.test.jobs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.test.jobs.QueueOrder;

import javax.annotation.Resource;

/**
 * 延迟队列订单生产-消费服务
 */
@Service
public class OrderService {
    private static final Logger logger = LoggerFactory.getLogger(OrderService.class);

    @Resource
    private OrderMapper orderMapper;

    // 生产者生产订单
    @Transactional(value = "orderTransactionManager")
    public QueueOrder produceOrder() throws Exception {
        return orderMapper.getOrder();
    }

    // 消费者消费订单
    @Transactional(value = "orderTransactionManager")
    public void consumeOrder(QueueOrder queueOrder) {
        try {
            orderMapper.updOrder(queueOrder.getOrderId());
        } catch (Exception e) {
            logger.error("jobs 更新订单表处理失败", e);
        }
    }
}

package com.test.jobs;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * 获取上下文对象
 */
@Component
public class ApplicationContextHelper implements ApplicationContextAware {
    public static ApplicationContext applicationContext;

    public ApplicationContextHelper() {
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        ApplicationContextHelper.applicationContext = applicationContext;
    }

    public static <T> T getBean(Class<T> clazz) {
        return applicationContext.getBean(clazz);
    }
}

三、生产者线程
package com.test.jobs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 延迟订单生产者线程
 */
public class ProducerDelayQueueThread extends Thread {

    private Logger logger = LoggerFactory.getLogger(ProducerDelayQueueThread.class);

    private DelayOrderResource resource;

    public ProducerDelayQueueThread(DelayOrderResource resource) {
        this.resource = resource;
    }

    public void run() {
        logger.info("延迟队列生产者线程启动");
        while (true) {
            try {
                resource.produce();
            } catch (Exception e) {
                logger.error("延迟队列生产异常", e);
            }
        }
    }
}

四、消费者线程
package com.test.jobs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerDelayQueueThread extends Thread {
    private Logger logger = LoggerFactory.getLogger(ConsumerDelayQueueThread.class);

    private DelayOrderResource resource;

    public ConsumerDelayQueueThread(DelayOrderResource resource) {
        this.resource = resource;
    }

    public void run() {
        logger.info("延迟队列消费者线程启动");
        while (true) {
            try {
                resource.consume();
            } catch (Exception e) {
                logger.error("延迟队列消费异常", e);
            }
        }
    }
}

五、简单测试
    public static void main(String[] args) {
        DelayOrderResource resource = new DelayOrderResource();

        // 创建一个生产者线程
        ProducerDelayQueueThread p = new ProducerDelayQueueThread(resource);

        // 目前只开启一个生产者线程和一个消费者线程,后续可以改成线程池
        ConsumerDelayQueueThread c = new ConsumerDelayQueueThread(resource);

        // start 生产者和消费者线程,开始工作
        p.start();
        c.start();
    }
     
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-06 11:06:45  更:2022-05-06 11:09:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:04:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码