IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 基于redis实现延时任务 -> 正文阅读

[大数据]基于redis实现延时任务

redis的zset是有序集合,默认根据score升序排序。并且可以根据scope范围查询,因此可以启动一个线程循环执行范围查询,获取当前时间之前的数据,即要执行任务,(因为不是严格按照时间匹配的,因此可能会有一点时间偏差,但一般情况下不会有影响),处理完后删除缓存。考虑到线程有可能会异常退出(比如redis连接异常等),因此使用监听者模式设计了线程重启方案,监听者会监听线程,当线程出现异常时监听者会重启线程。下面是具体代码。

@Configuration
public class RedisTemplateConfig {

    @Bean(name = "myredis")
    public RedisTemplate redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        // 使用GenericFastJsonRedisSerializer 来序列化和反序列化redis的value值
        GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
        redisTemplate.setValueSerializer(genericFastJsonRedisSerializer);
        redisTemplate.setHashValueSerializer(genericFastJsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
public class DelayTaskConsumerThread extends Observable implements Runnable {

    @Override
    public void run() {
        System.out.println("线程启动:"+Thread.currentThread().getId());
        while (true){
            try {
                long time = System.currentTimeMillis();
                consumeMsg("testDelayTask",time);
            } catch (Exception e) {
                e.printStackTrace();

                System.out.println("线程异常:"+Thread.currentThread().getId());
                doBusiness();
                break;
            }
        }
    }
    private void consumeMsg(String key,long time){
        RedisTemplate redisTemplate = (RedisTemplate) SpringContextUtil.getBean("myredis");
        Set set = redisTemplate.opsForZSet().rangeByScore(key,0l, time);
        if (set.size() > 0){
            System.out.println("线程:"+Thread.currentThread().getId()+","+"获取到["+set.size()+"]条数据,[time="+time+"]");
            Iterator iterator = set.iterator();
            String next="";
            while (iterator.hasNext()){
                try {
                    next = (String) iterator.next();
                    System.out.println("线程:"+Thread.currentThread().getId()+","+"消费消息:[value="+ next +"]");
                }catch (Exception e){
                    //这里可以对消费异常的数据做进一步处理
                    throw e;
                }finally {
                    redisTemplate.opsForZSet().remove(key,next);
                }
            }
        }else {
            System.out.println("线程:"+Thread.currentThread().getId()+","+"没有消息[time="+time+"]");
        }
    }

    public void doBusiness(){
        if(true){
            super.setChanged();
        }
        notifyObservers();
    }
}
public class DelayTaskConsumerListener implements Observer {
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("DelayTaskConsumerThread线程异常退出,重新启动");
        DelayTaskConsumerThread run = new DelayTaskConsumerThread();
        run.addObserver(this);
        ThreadPoolUtil.execute(run);
        System.out.println("DelayTaskConsumerThread线程已重启");
    }
}
@Component
public class DelayTaskConsumerRunner implements ApplicationRunner {
    @Autowired
    @Qualifier(value = "myredis")
    private RedisTemplate redisTemplate;

    @Override
    public void run(ApplicationArguments args) {
        redisTemplate.delete("testDelayTask");
        DelayTaskConsumerThread taskConsumerThread = new DelayTaskConsumerThread();
        DelayTaskConsumerListener taskConsumerListener = new DelayTaskConsumerListener();
        taskConsumerThread.addObserver(taskConsumerListener);

        ThreadPoolUtil.execute(taskConsumerThread);
    }
}
@Component
public class SpringContextUtil implements ApplicationContextAware {
    /**
     * 上下文对象实例
     */
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringContextUtil.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     *
     * @return
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     *
     * @param name
     * @return
     */
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     *
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     *
     * @param name
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

}
@GetMapping(value = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public CommonResponse test() {
        produceMsg("testDelayTask", UUID.randomUUID().toString(),System.currentTimeMillis());//这里的时间戳实际是需要从页面选择的任务执行时间,这里为了方便直接使用了系统当前时间戳
        return new CommonResponse();
    }

    /**
     *
     * @param key
     * @param value
     * @param score 设置的任务执行时间
     */
    private void produceMsg(String key,String value,long score){
        Boolean add = redisTemplate.opsForZSet().add(key, value, score);
        if (add){
            System.out.println("发送消息:[value="+value+",score="+score+"]");
        }else {
            System.out.println("消息发送失败:[value="+value+",score="+score+"]");
        }
    }
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-26 11:47:11  更:2022-04-26 11:47:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 11:07:35-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码