IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 阿里或AWS Redis Key 失效事件实现延时任务 -> 正文阅读

[大数据]阿里或AWS Redis Key 失效事件实现延时任务

??

一、业务场景

  1. 某个设备需要保持激活状态,当失效的时候需要预警。

  1. 订单付款后需要在24小时之后触发短信提醒等等。

二、常见方案

  • 手动无线循环;

  • ScheduledExecutorService;

  • DelayQueue;

  • Redis zset 数据判断的方式;

  • Redis 键空间通知的方式;

  • Netty 提供的 HashedWheelTimer 工具类;

  • RabbitMQ 死信队列;

  • RabbitMQ 延迟消息插件 rabbitmq-delayed-message-exchange;

  • Spring Scheduled;

  • Quartz。

实现方式这里也例举一些,感兴趣可以自己扩展。本文主要讲通过redis实现。因为项目本身有使用redis的场景。

三、具体实现

Redis zset 数据判断的方式

  1. 借助 zset 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 Jedis 框架):

  2. import redis.clients.jedis.Jedis;
    import utils.JedisUtils;
    import java.time.Instant;
    import java.util.Set;
    public class RedisDelayQueueZset {
        // zset key
        private static final String KEY = "ghDelayQueue";
        
        public static void main(String[] args) throws InterruptedException {
            Jedis jedis = JedisUtils.getJedis();
            // 延迟 30s 执行(30s 后的时间)
            long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
            jedis.zadd(KEY, delayTime, "order_1");
            // 继续添加测试数据
            jedis.zadd(KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
            jedis.zadd(KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
            jedis.zadd(KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
            jedis.zadd(KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
            // 开启延迟队列
            doDelayQueue(jedis);
        }
        /**
         * 延迟队列消费
         * @param jedis Redis 客户端
         */
        public static void doDelayQueue(Jedis jedis) throws InterruptedException {
            while (true) {
                // 当前时间
                Instant nowInstant = Instant.now();
                long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间
                long nowSecond = nowInstant.getEpochSecond();
                // 查询当前时间的所有任务
                Set<String> data = jedis.zrangeByScore(KEY, lastSecond, nowSecond);
                for (String item : data) {
                    // 消费任务
                    System.out.println("消费:" + item);
                }
                // 删除已经执行的任务
                jedis.zremrangeByScore(KEY, lastSecond, nowSecond);
                Thread.sleep(1000); // 每秒轮询一次
            }
        }
    }

  3. 缺点:当数据量非常多的时候,比如超出redis读写瓶颈。又或者数据很少的时候。依旧要有一个循环在每秒操作。都显得不是很优雅于是有了下面的方式

Redis 键空间通知的方式

  1. 通过 notify-keyspace-events Ex 的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,利用这个机制实现延时任务的功能。

  2. 拿业务场景1来分析:

当设备上行或者心跳触发。那么我们更新key的失效时间。如果长时间掉线。那么监听到事件触发预警

? ? ?3.设置参数 notify-keyspace-events Ex

阿里云redis 设置完立马生效不需要重启

?

aws redis

默认的参数组只有common可以修改,所以如果用的不是common需要新建一个参数组覆盖原来的。这里我选择新建test 修改参数组也是立马生效不需要重启

? ? ? 4.代码实现:

redis失效监听器注册

/**
 * Redis 消息监听器容器.
 *
 * @param redisConnectionFactory the redis connection factory
 * @return the redis message listener container
 */
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
    redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
    return redisMessageListenerContainer;
}


/**
 * Redis Key失效监听器注册为Bean.
 *
 * @param redisMessageListenerContainer the redis message listener container
 * @return the redis event message listener
 */
@Bean
public RedisEventMessageListener redisEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer) {
    return new RedisEventMessageListener(redisMessageListenerContainer);
}

继承事件监听

package cn.fuzhi.cloud.v2.manager.redis;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.time.LocalDateTime;

public class RedisEventMessageListener extends KeyExpirationEventMessageListener {
 
    /**
     * Creates new {@link RedisEventMessageListener} for {@code __keyevent@*__:expired} messages.
     *
     * @param listenerContainer must not be {@literal null}.
     */
    public RedisEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
 
    @Override
    protected void doHandleMessage(Message message) {
        // 这个就是过期的key ,过期后,也就是事件触发后对应的value是拿不到的。
        // 这里实现业务逻辑,如果是服务器集群的话需要使用分布式锁进行抢占执行。
        String key = message.toString();
        System.out.println("key = " + key);
        System.out.println("end = " + LocalDateTime.now());
    }
}

四、redis-cli测试notify-keyspace-events是否生效

先开一个客户端连接

redis-cli -h xxxx -p 6379 -a password

连接成功输入

PSUBSCRIBE __keyevent@*__:expired

?

出现这个就可以了先不要关闭窗口

另外开一个窗口

 

setex ghDelayQueue 3 3

?

出现上述输出就表示配置成功

注意一点就是这种方式:事件触发后对应的value是拿不到的,只能获取到key的内容。所以要对key的命名给出规范

最后有啥提出一起交流

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:22:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:17:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码