IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot项目运行初始化数据以及利用redis来实现延迟队列 -> 正文阅读

[Java知识库]SpringBoot项目运行初始化数据以及利用redis来实现延迟队列

SpringBoot提供了一种方式对应用服务全部启动成功后来进行初始操作.CommandLineRunner、ApplicationRunner,通过实现这两个接口的run方法就能保证在项目运行后为我们初始化一些操作.
1.redis延迟队列的实现,首先我们选用它的Zset来达成我们的目标
延迟队列的通用接口:

public interface RedisDelayedQueueService {
  
    void add(String queueName, long delayedSecond, String val);
    
    List<String> get(String queueName);
}

为其设计的Zset实现类

@Component
public class RedisDelayedQueueServiceImpl implements RedisDelayedQueueService {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void add(String queueName, long delayedSecond, String val) {
        long expire = System.currentTimeMillis() + delayedSecond * 1000;
        stringRedisTemplate.opsForZSet().add(queueName, val, expire);
    }
    
    @Override
    public List<String> get(String queueName) {
        String now = System.currentTimeMillis() + "";
        List<String> keyList = new ArrayList<>();
        keyList.add(queueName);
        RedisScript<List<String>> redisScript = LuaScript.ZSET_GET_BY_SCORE_AND_DEL.getRedisScript();
        return stringRedisTemplate.execute(redisScript, keyList,"0", now);
    }
}

主要关注这个get方法,这个方法是实现延迟的关键

ZSET_GET_BY_SCORE_AND_DEL("延时队列获取元素", null,
            "local zRes=redis.call('zrangebyscore',KEYS[1],ARGV[1],ARGV[2]) " +
                    "if #zRes == 0 then return {} end " +
                    "redis.call('zremrangebyscore',KEYS[1],ARGV[1],ARGV[2]) return zRes",
            List.class)

利用lua脚本控制延时
2.延时线程类

@Slf4j
public abstract class AbstractDelayedThread extends Thread {
    @Resource
    private RedisDelayedQueueService redisDelayedQueueService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void run() {
        while (true) {
            try {
                List<String> list = redisDelayedQueueService.get(queueKey());
                for (String id : list) {
                    try{
                        ThreadPool.instance().submit(() -> action(id));
                    }catch (Exception e){
                        String key = RedisKey.REDIS_DELAYED_FAULT_LOG.key(Long.valueOf(id));
                        long time = System.currentTimeMillis();
                        Map<String,Object> map=new HashedMap();
                        map.put("queueKey",queueKey());
                        map.put("id",id);
                        map.put("creatTime",time);
                        stringRedisTemplate.opsForValue().set(key, map.toString());
                        log.warn("redis延时任务执行失败");
                    }
                }
                Thread.sleep(1000);
            } catch (Exception e) {
                // nothing
                log.error("监听拼团计时结束队列异常,异常信息:", e);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException interruptedException) {
                    interruptedException.printStackTrace();
                }
            }
        }
    }
    
    protected abstract void action(String id);
    
    protected abstract String queueKey();
}

run方法不停的从redis中取出key为设置好的常量再把他的value放到action中去执行
举一个他的子类作为例子

@Component
@Slf4j
public class OrderCancelThread extends AbstractDelayedThread {
    @Resource
    private McOrderService mcOrderService;

    @Override
    protected void action(String id) {
    //要重试的方法
        mcOrderService.cancel(Long.parseLong(id));
    }

    @Override
    protected String queueKey() {
        return RedisDelayedKey.ORDER_CANCEL.getKey();//常量key
    }
}

3.让它能够自己在项目初始化后动起来

@Component
@Slf4j
public class DelayedQueueService implements CommandLineRunner {
    @Resource
    private DelayedQueueProcessorRegistry delayedQueueProcessorRegistry;
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void run(String... args) {
        // redis预加载lua脚本指令
        LuaScript.loadScript(stringRedisTemplate);
        // 启动线程监听队列(定时轮询)
        delayedQueueProcessorRegistry.threadStart();
    }

}

4.实现后置处理器来筛选所有实现AbstractDelayedThread的实例bean

@Component
public class DelayedQueueProcessorRegistry implements BeanPostProcessor {
    private static final List<AbstractDelayedThread> INIT_PROCESS_LIST = new ArrayList<>();

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof AbstractDelayedThread) {
            INIT_PROCESS_LIST.add((AbstractDelayedThread) bean);
        }
        return bean;
    }

    /**
     * 所有队列线程启动
     **/
    public void threadStart(){
        for (AbstractDelayedThread thread : INIT_PROCESS_LIST) {
            thread.start();
        }
    }
}

实现了CommandLineRunner 接口的run方法来让它在项目初始化时调用方法,DelayedQueueProcessorRegistry实现了后置处理器接口,只要继承了AbstractDelayedThread的所有类都会被添加到List中,最后在threadStart被启动.
通过这一系列操作,我们最终完成了redis实现延迟队列的操作.

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-26 16:47:16  更:2022-06-26 16:47:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 16:33:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码