一、背景,开发场景
之前负责开发过一个会议室系统,这个系统整体不难,唯一有一个技术难点,就是需要延时任务,会议的开始、结束需要系统自动更新状态(进行中、已结束)以及开始前多分钟、快结束后多少分钟需要发送邮件提醒、短信提醒,以及超时待审批的会议室申请自动审核不通过。这些操作对我们系统来说,不是固定时刻的定时任务轮询就可以的了,因为每一时刻(粒度可能会小到秒)都有可能有执行的。这种开发场景,就需要使用到延时任务了。
- 不使用延时任务不可以吗?使用定时任务每隔一段很短的时间轮询去数据库查询数据?当然可以,但是如果数据很大,数据库的处理能力却十分有限,导致系统有性能问题。
- 使用延时队列,可以事先每个一段时间,把接下来一段时间内要执行的任务查询出来,放到延时队列。然后每次只需查询延时队列即可,不需要再查询数据库,可以减缓数据库的压力
二、分析方案
-
使用redis的zset数据结构开发一个延时队列 ZSet数据结构类似于Set结构,只是ZSet结构中,每个元素都会有一个分值,然后所有元素按照分值的大小进行排列,相当于是一个进行了排序的链表。Redis中的ZSet是一个有序的Set,内部使用HashMap和跳表(SkipList)来保证数据的存储和有序,HashMap里放的是成员到score的映射,而跳跃表里存放的是所有的成员,排序依据是HashMap里存的score,使用跳跃表的结构可以获得比较高的查找效率,并且在实现上比较简单。 -
JDK ScheduledExecutorService -
时间轮 -
redis的key过期回调
由上分析,使用ScheduledExecutorService虽然简单,但是不支持分布式的部署,没有高可用,最终采取了 使用redis的zset数据结构开发一个延时队列,开发一个支持分布式、轻量简单、低延时、消息可靠的延时队列
三、 代码解析
这个使用redis的延时队列的设计方案分为四部分
- 延时队列(使用redis的zset结构开发延时队列)
- 延时任务的具体策略类(使用策略模式,不同延时任务有不同的类获取和执行延时任务)
- 消费延时任务的监听器,消费到期的消息任务(也是定时器)
- 增加延时任务的定时器,每隔一段时间执行,加入接下来一段时间内要执行的定时任务
延时队列
封装延时任务的bean
@Data
public class DelayMessage {
private Integer applyId;
private Date executeTime;
private String jobType;
private int level;
public DelayMessage(){
}
public DelayMessage(Integer applyId, Date executeTime, String jobType) {
this.applyId = applyId;
this.executeTime = executeTime;
this.jobType = StringUtils.uncapitalize(jobType);
this.level = 0;
}
}
根据redis的zset结构封装一个延时队列的类 (这个延时队列主要由增加任务到延时队列,从延时队列删除任务) 核心方法是take():从延时队列获取到期需要执行的消息
@Component
@Slf4j
public class DelayQueue {
@Value("${queue.key}")
private String MEETING_QUEUE_KEY;
@Value("${queue.job.max.retires}")
private int jobMaxRetires;
@Value("${queue.job.max.ms}")
private int jobMaxMs;
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private RedisLock redisLock;
public void add(DelayMessage message){
String value =JSONObject.toJSONString(message);
redisTemplate.opsForZSet().add(MEETING_QUEUE_KEY,value,message.getExecuteTime().getTime());
}
public void add(Integer applyId, Date executeTime,String jobType){
DelayMessage message = new DelayMessage(applyId,executeTime,jobType);
this.add(message);
}
public void addAll(List<DelayMessage> messages){
if(!CollectionUtils.isEmpty(messages)){
messages.stream().forEach(message->{
this.add(message);
});
}
}
public void remove(DelayMessage message){
String value =JSONObject.toJSONString(message);
redisTemplate.opsForZSet().remove(MEETING_QUEUE_KEY,value);
}
public void remove(Integer applyId,Date executeTime,String jobType){
DelayMessage message = new DelayMessage(applyId,executeTime,jobType);
this.remove(message);
}
public void removeHandingMessage(DelayMessage message){
String value =JSONObject.toJSONString(message);
redisTemplate.opsForZSet().remove(MEETING_QUEUE_KEY+"_HANDING",value);
}
public List<DelayMessage> take(){
List<DelayMessage> delayMessageList = Collections.emptyList();
try{
boolean success = this.redisLock.lock(MEETING_QUEUE_KEY);
if (!success) {
return delayMessageList;
}
List<String> toDoMessageList = this.getMessageByQueue(MEETING_QUEUE_KEY);
List<String> overTimeMessageList = this.getMessageByQueue(MEETING_QUEUE_KEY+"_HANDING");
this.handToDoMessage(toDoMessageList);
this.handOvertimeMessage(overTimeMessageList);
if(!CollectionUtils.isEmpty(toDoMessageList)){
delayMessageList = toDoMessageList.stream().map(value->{
DelayMessage message = JSONObject.parseObject(value,DelayMessage.class);
return message;
}).collect(Collectors.toList());
}
}finally {
this.redisLock.delete(MEETING_QUEUE_KEY);
}
return delayMessageList;
}
private List<String> getMessageByQueue(String queueKey){
Set<ZSetOperations.TypedTuple<Object>> tuples = redisTemplate.opsForZSet().rangeWithScores(queueKey,0,-1);
if(queueKey.equals(MEETING_QUEUE_KEY)){
log.info("当前延时队列 待处理任务数量:{}",tuples.size());
}
Iterator<ZSetOperations.TypedTuple<Object>> iterator = tuples.iterator();
List<String> messageList = null;
while (iterator.hasNext())
{
ZSetOperations.TypedTuple<Object> typedTuple = iterator.next();
long nowScore = System.currentTimeMillis();
double score = typedTuple.getScore();
if(nowScore > score){
if(messageList == null){
messageList = new ArrayList<>();
}
String value = (String)typedTuple.getValue();
messageList.add(value);
}else{
break;
}
}
return messageList;
}
private void handToDoMessage(List<String> toDoMessageList){
if(!CollectionUtils.isEmpty(toDoMessageList)){
List<String> delValues = toDoMessageList;
redisTemplate.execute(new SessionCallback<Object>(){
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
operations.multi();
redisTemplate.opsForZSet().remove(MEETING_QUEUE_KEY,delValues.toArray());
for(String value:delValues){
long score = DateUtils.addSecond(new Date(),jobMaxMs).getTime();
redisTemplate.opsForZSet().add(MEETING_QUEUE_KEY+"_HANDING",value,score);
}
operations.exec();
return null;
}
});
}
}
private void handOvertimeMessage(List<String> overTimeMessageList){
if(!CollectionUtils.isEmpty(overTimeMessageList)){
List<String> delValues = overTimeMessageList;
redisTemplate.execute(new SessionCallback<Object>(){
@Override
public <K, V> Object execute(RedisOperations<K, V> operations) throws DataAccessException {
operations.multi();
redisTemplate.opsForZSet().remove(MEETING_QUEUE_KEY+"_HANDING",delValues.toArray());
for(String value:delValues){
DelayMessage message = JSONObject.parseObject(value,DelayMessage.class);
if(jobMaxRetires == -1 || message.getLevel() < jobMaxRetires){
message.setLevel(message.getLevel()+1);
String nowValue = JSONObject.toJSONString(message);
redisTemplate.opsForZSet().add(MEETING_QUEUE_KEY,nowValue,message.getExecuteTime().getTime());
}
}
operations.exec();
return null;
}
});
}
}
}
延时任务
上面有了延时队列了,但是延时任务要怎么加入延时队列呢?以及延时任务要如何获取? 不同延时任务有不同的延时任务的获取和执行任务的具体实现是不一样,例如修改会议为进行中、已结束和发送邮件提醒等任务,这些任务获取逻辑是不一样的,我们使用策略模式,把不同的延时任务封装为不同的策略,一个策略类对应一个延时任务的获取和执行。
延时任务的策略类的接口
public interface DelayJob {
String getJobName();
List<DelayMessage> getJob(Date curDate);
void executeJob(DelayMessage message) throws Exception;
}
zrangebyscore myzset 0 100 withscores
|