| SpringBoot提供了一种方式对应用服务全部启动成功后来进行初始操作.CommandLineRunner、ApplicationRunner,通过实现这两个接口的run方法就能保证在项目运行后为我们初始化一些操作.1.redis延迟队列的实现,首先我们选用它的Zset来达成我们的目标
 延迟队列的通用接口:
 public interface RedisDelayedQueueService {
  
    void add(String queueName, long delayedSecond, String val);
    
    List<String> get(String queueName);
}
 为其设计的Zset实现类 @Component
public class RedisDelayedQueueServiceImpl implements RedisDelayedQueueService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void add(String queueName, long delayedSecond, String val) {
        long expire = System.currentTimeMillis() + delayedSecond * 1000;
        stringRedisTemplate.opsForZSet().add(queueName, val, expire);
    }
    
    @Override
    public List<String> get(String queueName) {
        String now = System.currentTimeMillis() + "";
        List<String> keyList = new ArrayList<>();
        keyList.add(queueName);
        RedisScript<List<String>> redisScript = LuaScript.ZSET_GET_BY_SCORE_AND_DEL.getRedisScript();
        return stringRedisTemplate.execute(redisScript, keyList,"0", now);
    }
}
 主要关注这个get方法,这个方法是实现延迟的关键 ZSET_GET_BY_SCORE_AND_DEL("延时队列获取元素", null,
            "local zRes=redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) " +
                    "if #zRes == 0 then return {} end " +
                    "redis.call('zremrangebyscore',KEYS[1],ARGV[1],ARGV[2]) return zRes",
            List.class)
 利用lua脚本控制延时2.延时线程类
 @Slf4j
public abstract class AbstractDelayedThread extends Thread {
    @Resource
    private RedisDelayedQueueService redisDelayedQueueService;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void run() {
        while (true) {
            try {
                List<String> list = redisDelayedQueueService.get(queueKey());
                for (String id : list) {
                    try{
                        ThreadPool.instance().submit(() -> action(id));
                    }catch (Exception e){
                        String key = RedisKey.REDIS_DELAYED_FAULT_LOG.key(Long.valueOf(id));
                        long time = System.currentTimeMillis();
                        Map<String,Object> map=new HashedMap();
                        map.put("queueKey",queueKey());
                        map.put("id",id);
                        map.put("creatTime",time);
                        stringRedisTemplate.opsForValue().set(key, map.toString());
                        log.warn("redis延时任务执行失败");
                    }
                }
                Thread.sleep(1000);
            } catch (Exception e) {
                
                log.error("监听拼团计时结束队列异常,异常信息:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    
    protected abstract void action(String id);
    
    protected abstract String queueKey();
}
 run方法不停的从redis中取出key为设置好的常量再把他的value放到action中去执行举一个他的子类作为例子
 @Component
@Slf4j
public class OrderCancelThread extends AbstractDelayedThread {
    @Resource
    private McOrderService mcOrderService;
    @Override
    protected void action(String id) {
    
        mcOrderService.cancel(Long.parseLong(id));
    }
    @Override
    protected String queueKey() {
        return RedisDelayedKey.ORDER_CANCEL.getKey();
    }
}
 3.让它能够自己在项目初始化后动起来 @Component
@Slf4j
public class DelayedQueueService implements CommandLineRunner {
    @Resource
    private DelayedQueueProcessorRegistry delayedQueueProcessorRegistry;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Override
    public void run(String... args) {
        
        LuaScript.loadScript(stringRedisTemplate);
        
        delayedQueueProcessorRegistry.threadStart();
    }
}
 4.实现后置处理器来筛选所有实现AbstractDelayedThread的实例bean @Component
public class DelayedQueueProcessorRegistry implements BeanPostProcessor {
    private static final List<AbstractDelayedThread> INIT_PROCESS_LIST = new ArrayList<>();
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AbstractDelayedThread) {
            INIT_PROCESS_LIST.add((AbstractDelayedThread) bean);
        }
        return bean;
    }
    
    public void threadStart(){
        for (AbstractDelayedThread thread : INIT_PROCESS_LIST) {
            thread.start();
        }
    }
}
 实现了CommandLineRunner 接口的run方法来让它在项目初始化时调用方法,DelayedQueueProcessorRegistry实现了后置处理器接口,只要继承了AbstractDelayedThread的所有类都会被添加到List中,最后在threadStart被启动.通过这一系列操作,我们最终完成了redis实现延迟队列的操作.
 |