SpringBoot提供了一种方式对应用服务全部启动成功后来进行初始操作.CommandLineRunner、ApplicationRunner,通过实现这两个接口的run方法就能保证在项目运行后为我们初始化一些操作. 1.redis延迟队列的实现,首先我们选用它的Zset来达成我们的目标 延迟队列的通用接口:
public interface RedisDelayedQueueService {
void add(String queueName, long delayedSecond, String val);
List<String> get(String queueName);
}
为其设计的Zset实现类
@Component
public class RedisDelayedQueueServiceImpl implements RedisDelayedQueueService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void add(String queueName, long delayedSecond, String val) {
long expire = System.currentTimeMillis() + delayedSecond * 1000;
stringRedisTemplate.opsForZSet().add(queueName, val, expire);
}
@Override
public List<String> get(String queueName) {
String now = System.currentTimeMillis() + "";
List<String> keyList = new ArrayList<>();
keyList.add(queueName);
RedisScript<List<String>> redisScript = LuaScript.ZSET_GET_BY_SCORE_AND_DEL.getRedisScript();
return stringRedisTemplate.execute(redisScript, keyList,"0", now);
}
}
主要关注这个get方法,这个方法是实现延迟的关键
ZSET_GET_BY_SCORE_AND_DEL("延时队列获取元素", null,
"local zRes=redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) " +
"if #zRes == 0 then return {} end " +
"redis.call('zremrangebyscore',KEYS[1],ARGV[1],ARGV[2]) return zRes",
List.class)
利用lua脚本控制延时 2.延时线程类
@Slf4j
public abstract class AbstractDelayedThread extends Thread {
@Resource
private RedisDelayedQueueService redisDelayedQueueService;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void run() {
while (true) {
try {
List<String> list = redisDelayedQueueService.get(queueKey());
for (String id : list) {
try{
ThreadPool.instance().submit(() -> action(id));
}catch (Exception e){
String key = RedisKey.REDIS_DELAYED_FAULT_LOG.key(Long.valueOf(id));
long time = System.currentTimeMillis();
Map<String,Object> map=new HashedMap();
map.put("queueKey",queueKey());
map.put("id",id);
map.put("creatTime",time);
stringRedisTemplate.opsForValue().set(key, map.toString());
log.warn("redis延时任务执行失败");
}
}
Thread.sleep(1000);
} catch (Exception e) {
log.error("监听拼团计时结束队列异常,异常信息:", e);
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
}
}
}
protected abstract void action(String id);
protected abstract String queueKey();
}
run方法不停的从redis中取出key为设置好的常量再把他的value放到action中去执行 举一个他的子类作为例子
@Component
@Slf4j
public class OrderCancelThread extends AbstractDelayedThread {
@Resource
private McOrderService mcOrderService;
@Override
protected void action(String id) {
mcOrderService.cancel(Long.parseLong(id));
}
@Override
protected String queueKey() {
return RedisDelayedKey.ORDER_CANCEL.getKey();
}
}
3.让它能够自己在项目初始化后动起来
@Component
@Slf4j
public class DelayedQueueService implements CommandLineRunner {
@Resource
private DelayedQueueProcessorRegistry delayedQueueProcessorRegistry;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public void run(String... args) {
LuaScript.loadScript(stringRedisTemplate);
delayedQueueProcessorRegistry.threadStart();
}
}
4.实现后置处理器来筛选所有实现AbstractDelayedThread的实例bean
@Component
public class DelayedQueueProcessorRegistry implements BeanPostProcessor {
private static final List<AbstractDelayedThread> INIT_PROCESS_LIST = new ArrayList<>();
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AbstractDelayedThread) {
INIT_PROCESS_LIST.add((AbstractDelayedThread) bean);
}
return bean;
}
public void threadStart(){
for (AbstractDelayedThread thread : INIT_PROCESS_LIST) {
thread.start();
}
}
}
实现了CommandLineRunner 接口的run方法来让它在项目初始化时调用方法,DelayedQueueProcessorRegistry实现了后置处理器接口,只要继承了AbstractDelayedThread的所有类都会被添加到List中,最后在threadStart被启动. 通过这一系列操作,我们最终完成了redis实现延迟队列的操作.
|