在前面的文章
第三方支付接口设计中我留了一个问题:
订单超时关闭。这个问题在面试当中也是经常被问到,本文我们就来展开说明一下。
和订单超时关闭类似的场景还有:
- 淘宝自动确认收货;
- 微信红包24小时未查收,需要延迟退还;
- 滴滴预约打车的时候,十分钟没有司机接单,系统会自动取消。
针对上述这些,到了目标时间,系统自动触发代替用户执行的任务,有一个专业的名字:延迟任务。
对于这一类需求我们最先想到的一般就是使用定时任务,通过扫描数据库符合条件的数据,并对其进行更新操作。
延迟任务和定时任务的区别:
- 定时任务有固定的触发时间,而延迟任务不固定,它依赖于业务事件的触发时间。(比如,取消订单是在生成订单后的半个小时);
- 定时任务是周期性的,而延迟任务被触发之后,就结束了,一般是一次性的;
- 定时任务一般处理的是多个任务,延迟任务一般是一个任务。
我们下面来看一下定时任务的实现。
定时任务实现
定时任务的实现有这么几种方式:
- JDK自带Timer实现
- Quartz框架实现
- Spring3.0以后自带的task
- 分布式任务调度:XXL-Job
大概逻辑如下:
假设订单表:t_order(id,end_time,status) ;
数据扫描:
select id from t_order where end_time>=30 and status=初始状态;
修改:
update t_order set status=结束 where id in (超时订单id);
注:如果超时的订单数量很大,就需要分页查询。
这种方式的优点是实现简单,支持分布式/集群环境。
缺点:
- 通过轮询不断地扫描数据库,如果数据量很大,并且任务的执行间隔时间较短,对数据库会造成一定的压力;
- 间隔时间粒度不好设置;
- 存在延迟:如果设置5分钟扫描一次,那么最坏的延迟时间就是5分钟。
被动取消
被动取消和懒加载的思想一致。当用户查询订单的时候,去判断订单是否超时,如果是,走超时的逻辑。
这种方式依赖用户的查询操作。如果用户一直不查询,那么订单就一直不会被取消。
这种方法就是实现简单,不需要增加额外的处理操作。缺点是时效性低,影响用户的体验。
现在也有用定时任务+被动取消的组合方式实现。
上面讲的是定时任务的解决方案,下面我们具体讲一讲延迟任务常见的技术实现。
JDK的延迟队列
通过JDK提供的DelayQueue 类来实现。DelayQueue 是一个支持延时获取元素的,无界阻塞队列。 队列中的元素必须实现 Delayed 接口,并重写 getDelay(TimeUnit) 和 compareTo(Delayed) 方法。
元素只有在延迟期满时才能从队列中取走。并且队列是有序的,队头放置的元素延迟到期时间最长。
代码演示
定义元素类,作为队列的元素:
public class MyDelayedTask implements Delayed {
private String orderId;
private long startTime;
private long delayMillis;
public MyDelayedTask(String orderId, long delayMillis) {
this.orderId = orderId;
this.startTime = System.currentTimeMillis();
this.delayMillis = delayMillis;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert((startTime + delayMillis) - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
public void exec() {
System.out.println(orderId + "编号的订单要删除啦!!!");
}
}
测试:
public static void main(String[] args) throws InterruptedException {
List<String> list = new ArrayList<String>();
list.add("00000001");
list.add("00000002");
list.add("00000003");
list.add("00000004");
list.add("00000005");
long start = System.currentTimeMillis();
for (int i = 0; i < list.size(); i++) {
delayQueue.put(new MyDelayedTask(list.get(i), 3000));
delayQueue.take().exec();
System.out.println("After " + (System.currentTimeMillis() - start) + " MilliSeconds");
}
}
结果打印:
00000001编号的订单要删除啦!!!
After 3004 MilliSeconds
00000002编号的订单要删除啦!!!
After 6009 MilliSeconds
00000003编号的订单要删除啦!!!
After 9012 MilliSeconds
00000004编号的订单要删除啦!!!
After 12018 MilliSeconds
00000005编号的订单要删除啦!!!
After 15020 MilliSeconds
优点:效率高,任务触发时间延迟低。 缺点:
- 服务器重启后,数据全部消失,怕宕机
- 集群扩展相当麻烦
- 因为是无界队列,如果任务太多的话,那么很容易就出现OOM异常
- 代码复杂度较高
时间轮算法
时间轮是一种高效来利用线程资源来进行批量化调度的一种调度模型。把大批量的调度任务全部都绑定到同一个的调度器上面,使用这一个调度器来进行所有任务的管理(manager),触发(trigger)以及运行(runnable)。能够高效的管理各种延时任务,周期任务,通知任务等等。
缺点,时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太适合。因为时间轮算法的精度取决于,时间段“指针”单元的最小粒度大小,比如时间轮的格子是一秒跳一次,那么调度精度小于一秒的任务就无法被时间轮所调度。而且时间轮算法没有做宕机备份,因此无法再宕机之后恢复任务重新调度。
代码演示
依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>
Demo:
public class HashedWheelTimerTest {
private static final long start = System.currentTimeMillis();
public static void main(String[] args) {
HashedWheelTimer timer = new HashedWheelTimer(1,
TimeUnit.SECONDS,
10);
TimerTask task1 = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("已经过了" + costTime() + " 秒,task1 开始执行");
}
};
TimerTask task2 = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("已经过了" + costTime() + " 秒,task2 开始执行");
}
};
TimerTask task3 = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("已经过了" + costTime() + " 秒,task3 开始执行");
}
};
timer.newTimeout(task1, 0, TimeUnit.SECONDS);
timer.newTimeout(task2, 3, TimeUnit.SECONDS);
timer.newTimeout(task3, 15, TimeUnit.SECONDS);
}
private static Long costTime() {
return (System.currentTimeMillis() - start) / 1000;
}
}
Redis zset 实现延迟任务
zset 是一个有序集合,ZSet结构中,每个元素(member)都会有一个分值(score),然后所有元素按照分值的大小进行排列。
我们将订单超时时间戳与订单号分别设置为 score 和 member 。也就是说集合列表中的记录是按执行时间排序,我们只需要取小于当前时间的即可。
代码演示
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.UUID;
@Configuration
public class RedisDelayDemo {
@Autowired
private RedisTemplate redisTemplate;
public void setDelayTasks(long delayTime) {
String orderId = UUID.randomUUID().toString();
Boolean addResult = redisTemplate.opsForZSet().add("delayQueue", orderId, System.currentTimeMillis() + delayTime);
if (addResult) {
System.out.println("添加任务成功!" + orderId + ", 当前时间为" + LocalDateTime.now());
}
}
public void listenDelayLoop() {
while (true) {
Set<String> set = redisTemplate.opsForZSet().rangeByScore("delayQueue", 0, System.currentTimeMillis(), 0, 1);
if (set.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
continue;
}
String it = set.iterator().next();
if (redisTemplate.opsForZSet().remove("delayQueue", it) > 0) {
System.out.println("消息到期" + it + ",时间为" + LocalDateTime.now());
}
}
}
}
测试:
@RequestMapping("/delayTest")
public void delayTest() {
delayDemo.setDelayTasks(5000L);
delayDemo.listenDelayLoop();
}
结果打印:
添加任务成功!e99961a0-fc1d-43d4-a83e-8db5fb6b3273, 当前时间为2021-10-24T12:06:59.037363700
消息到期e99961a0-fc1d-43d4-a83e-8db5fb6b3273,时间为2021-10-24T12:07:04.097486
优点:
- 集群扩展方便
- 时间准确度高
- 不用担心宕机问题
缺点:需要额外进行redis维护。在高并发条件下,多消费者可能会取到同一个订单号。这种情况可以增加一个分布式锁来处理,但是,性能下降严重。
MQ 延时消息
我们可以通过MQ延时消息实现,以RocketMQ举例。
通常的消息在投递后会立马被消费者所消费,而延时消息在投递时,需要设置指定的延时级别(不同延迟级别对应不同延迟时间),即等到特定的时间间隔后消息才会被消费者消费,这样就将数据库层面的压力转移到了MQ中,也不需要手写定时器,降低了业务复杂度,同时MQ自带削峰功能,能够很好的应对业务高峰。
代码演示
依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.0.0-PREVIEW</version>
</dependency>
生产者demo:
@Component
public class ProducerSchedule {
private DefaultMQProducer producer;
@Value("${rocketmq.producer.producer-group}")
private String producerGroup;
@Value("${rocketmq.namesrv-addr}")
private String nameSrvAddr;
public ProducerSchedule() {
}
@PostConstruct
public void defaultMQProducer() {
if (Objects.isNull(this.producer)) {
this.producer = new DefaultMQProducer(this.producerGroup);
this.producer.setNamesrvAddr(this.nameSrvAddr);
}
try {
this.producer.start();
System.out.println("Producer start");
} catch (MQClientException e) {
e.printStackTrace();
}
}
public String send(String topic, String messageText) {
Message message = new Message(topic, messageText.getBytes());
message.setDelayTimeLevel(4);
SendResult result = null;
try {
result = this.producer.send(message);
System.out.println("返回信息:" + JSON.toJSONString(result));
} catch (Exception e) {
e.printStackTrace();
}
return result.getMsgId();
}
}
消费者demo:
@Component
public class ConsumerSchedule implements CommandLineRunner {
@Value("${rocketmq.consumer.consumer-group}")
private String consumerGroup;
@Value("${rocketmq.namesrv-addr}")
private String nameSrvAddr;
@Value("${rocketmq.topic}")
private String rocketmqTopic;
public void messageListener() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.consumerGroup);
consumer.setNamesrvAddr(this.nameSrvAddr);
consumer.subscribe(rocketmqTopic, "*");
consumer.setConsumeMessageBatchMaxSize(1);
consumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {
for (Message message : messages) {
System.out.println("监听到消息:" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
@Override
public void run(String... args) throws Exception {
this.messageListener();
}
}
设置消息延时级别的方法是setDelayTimeLevel() ,目前RocketMQ不支持任意时间间隔的延时消息,只支持特定级别的延时消息。
写在最后
今天是[2021-10-24],祝大家 程序员节快乐,早日实现自己的小目标!!!
如果你还想看更多优质原创文章,欢迎关注我的公众号「ShawnLux」。
|