??《Java 延迟队列 DelayQueue 的原理》一文,总结了延迟队列的原理,本文结合代码总结下基于延迟队列 DelayQueue ,采用 “生产者-消费者” 模式实现的订单延迟处理功能。 ??延迟队列在实际开发中的应用要有几部分:实现了 Delayed 接口的消息体(本文中即为订单对象)、存放消息的延时队列、生产消息的生产者、消费消息的消费者。
一、订单对象
??存放到 DelayQueue 的元素必须实现 Delayed 接口,重写方法 compareTo 和 getDelay。Delayed 接口使对象成为延迟对象,它使存放在 DelayQueue 中的对象具有了延迟处理时间。订单对象(简化版)示例如下。
package com.test.jobs;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class QueueOrder implements Delayed {
private String orderId;
private Integer orderAt;
private String orderDt;
private long endTime;
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed delayed) {
QueueOrder queueOrder = (QueueOrder) delayed;
return (int) ((this.endTime - queueOrder.endTime));
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
}
二、延迟队列
??创建延迟队列,声明订单生产及消费方法,代码(简化版)如下。
package com.test.jobs;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.test.jobs.ApplicationContextHelper;
import com.test.jobs.QueueOrder;
import java.util.concurrent.DelayQueue;
public class DelayOrderResource {
private static Logger logger = LoggerFactory.getLogger(DelayOrderResource.class);
private DelayQueue<QueueOrder> delayQueue = new DelayQueue<>();
public OrderService orderService = ApplicationContextHelper.getBean(OrderService.class);
public void produce() {
try {
QueueOrder queueOrder = orderService.getOrder();
queueOrder.setEndTime(1651660951472l);
delayQueue.put(queueOrder);
} catch (Exception e) {
logger.error("订单生产出现异常", e);
}
}
public void consume() {
try {
QueueOrder queueOrder = delayQueue.take();
logger.debug("延迟队列消费者处理订单:[{}]", JSON.toJSONString(queueOrder));
orderService.dealOrder(queueOrder);
} catch (Exception e) {
logger.error("异常", e);
}
}
}
??订单处理服务类 OrderService 代码示例如下。
package com.test.jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.test.jobs.QueueOrder;
import javax.annotation.Resource;
@Service
public class OrderService {
private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
@Resource
private OrderMapper orderMapper;
@Transactional(value = "orderTransactionManager")
public QueueOrder produceOrder() throws Exception {
return orderMapper.getOrder();
}
@Transactional(value = "orderTransactionManager")
public void consumeOrder(QueueOrder queueOrder) {
try {
orderMapper.updOrder(queueOrder.getOrderId());
} catch (Exception e) {
logger.error("jobs 更新订单表处理失败", e);
}
}
}
package com.test.jobs;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextHelper implements ApplicationContextAware {
public static ApplicationContext applicationContext;
public ApplicationContextHelper() {
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ApplicationContextHelper.applicationContext = applicationContext;
}
public static <T> T getBean(Class<T> clazz) {
return applicationContext.getBean(clazz);
}
}
三、生产者线程
package com.test.jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerDelayQueueThread extends Thread {
private Logger logger = LoggerFactory.getLogger(ProducerDelayQueueThread.class);
private DelayOrderResource resource;
public ProducerDelayQueueThread(DelayOrderResource resource) {
this.resource = resource;
}
public void run() {
logger.info("延迟队列生产者线程启动");
while (true) {
try {
resource.produce();
} catch (Exception e) {
logger.error("延迟队列生产异常", e);
}
}
}
}
四、消费者线程
package com.test.jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerDelayQueueThread extends Thread {
private Logger logger = LoggerFactory.getLogger(ConsumerDelayQueueThread.class);
private DelayOrderResource resource;
public ConsumerDelayQueueThread(DelayOrderResource resource) {
this.resource = resource;
}
public void run() {
logger.info("延迟队列消费者线程启动");
while (true) {
try {
resource.consume();
} catch (Exception e) {
logger.error("延迟队列消费异常", e);
}
}
}
}
五、简单测试
public static void main(String[] args) {
DelayOrderResource resource = new DelayOrderResource();
ProducerDelayQueueThread p = new ProducerDelayQueueThread(resource);
ConsumerDelayQueueThread c = new ConsumerDelayQueueThread(resource);
p.start();
c.start();
}
|