IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Redis延时队列解决Java预约任务问题 -> 正文阅读

[大数据]Redis延时队列解决Java预约任务问题

没有用Java的延时队列Delayed,因为Delayed是单节点的,而redis的延时队列可以设置多节点。

先导入所需要的jar包redission

<!--redisson-->
		<dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.15.6</version>
		</dependency>
/**
 * 队列事件监听接口,需要实现这个方法
 *
 * @param <T>
 */
public interface RedisDelayedQueueListener<T> {
    /**
     * 执行方法
     */
    void invoke(T t);
}

创建数据封装类

@Data
public class TaskBodyDTO implements Serializable {
    private TaskData taskData;// 需要用到的数据

    private String topicName;

    private String name;// 项目名,应该是用来分组的
}

延时队列:

@Component
@Slf4j
public class RedisDelayedQueue implements RedisDelayedQueueListener<TaskBodyDTO>{

    private final CrwalProduce crwalProduce;

   private final RedissonClient redissonClient;

    public RedisDelayedQueue(CrwalProduce crwalProduce, RedissonClient redissonClient) {
        this.crwalProduce = crwalProduce;
        this.redissonClient = redissonClient;
    }

    /**
     * 添加队列
     *
     * @param t        DTO传输类
     * @param delay    时间数量
     * @param timeUnit 时间单位
     * @param <T>      泛型
     */
    public <T> void addQueue(T t, long delay, TimeUnit timeUnit) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue("collect");
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
//        delayedQueue.destroy();
    }

    /**
     *回调方法
     */
    @Override
    public void invoke(TaskBodyDTO taskBodyDTO) {
        //这里调用你延迟之后的代码
        String message=null;
        try {
            message = new ObjectMapper().writeValueAsString(taskBodyDTO.getTaskData());
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        crwalProduce.send(taskBodyDTO.getTopicName(), message);
        log.info("执行...." + taskBodyDTO.getTaskData().toString() + "===" + taskBodyDTO.getName());
    }
}

初始化队列监听:

/**
 * 初始化队列监听
 */
@Component
@Slf4j
public class RedisDelayedQueueInit implements ApplicationContextAware {

    @Autowired
    RedissonClient redissonClient;

    /**
     * 获取应用上下文并获取相应的接口实现类
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
        for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
            String listenerName = taskEventListenerEntry.getValue().getClass().getName();
            startThread(listenerName, taskEventListenerEntry.getValue());
        }
    }

    /**
     * 启动线程获取队列*
     *
     * @param queueName                 queueName
     * @param redisDelayedQueueListener 任务回调监听
     * @param <T>                       泛型
     * @return
     */
    private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue("collect");
        //由于此线程需要常驻,可以新建线程,不用交给线程池管理
        Thread thread = new Thread(() -> {
            log.info("启动监听队列线程" + queueName);
            while (true) {
                try {
                    T t = blockingFairQueue.take();
                    log.info("监听队列线程{},获取到值:{}", queueName, JSON.toJSONString(t));
                    new Thread(() -> {
                        redisDelayedQueueListener.invoke(t);
                    }).start();
                } catch (Exception e) {
                    log.info("监听队列线程错误,", e);
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        });
        thread.setName(queueName);
        thread.start();
    }

}

注:RedissionClient是自己创建并注入SpringIoc容器的

@Bean
	public RedissonClient redissonClient(){
		Config config = new Config();
		config.useSingleServer()
				.setAddress("redis://localhost:6379")
				.setPassword("foobared");
		return Redisson.create(config);
	}

新建的时候可以设置多个节点

然后调用

redisDelayedQueue.addQueue(taskBodyDTO,between.getSeconds(), TimeUnit.SECONDS);就把任务添加到了队列中 第二个参数是要延迟的时间,第三个参数是时间单位

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-29 12:12:55  更:2022-04-29 12:15:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 10:12:15-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码