??
一、业务场景
-
某个设备需要保持激活状态,当失效的时候需要预警。
-
订单付款后需要在24小时之后触发短信提醒等等。
二、常见方案
实现方式这里也例举一些,感兴趣可以自己扩展。本文主要讲通过redis实现。因为项目本身有使用redis的场景。
三、具体实现
Redis zset 数据判断的方式
-
借助 zset 数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助 Jedis 框架): -
import redis.clients.jedis.Jedis;
import utils.JedisUtils;
import java.time.Instant;
import java.util.Set;
public class RedisDelayQueueZset {
// zset key
private static final String KEY = "ghDelayQueue";
public static void main(String[] args) throws InterruptedException {
Jedis jedis = JedisUtils.getJedis();
// 延迟 30s 执行(30s 后的时间)
long delayTime = Instant.now().plusSeconds(30).getEpochSecond();
jedis.zadd(KEY, delayTime, "order_1");
// 继续添加测试数据
jedis.zadd(KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");
jedis.zadd(KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");
jedis.zadd(KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");
jedis.zadd(KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");
// 开启延迟队列
doDelayQueue(jedis);
}
/**
* 延迟队列消费
* @param jedis Redis 客户端
*/
public static void doDelayQueue(Jedis jedis) throws InterruptedException {
while (true) {
// 当前时间
Instant nowInstant = Instant.now();
long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒时间
long nowSecond = nowInstant.getEpochSecond();
// 查询当前时间的所有任务
Set<String> data = jedis.zrangeByScore(KEY, lastSecond, nowSecond);
for (String item : data) {
// 消费任务
System.out.println("消费:" + item);
}
// 删除已经执行的任务
jedis.zremrangeByScore(KEY, lastSecond, nowSecond);
Thread.sleep(1000); // 每秒轮询一次
}
}
} -
缺点:当数据量非常多的时候,比如超出redis读写瓶颈。又或者数据很少的时候。依旧要有一个循环在每秒操作。都显得不是很优雅于是有了下面的方式
Redis 键空间通知的方式
-
通过 notify-keyspace-events Ex 的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,利用这个机制实现延时任务的功能。 -
拿业务场景1来分析:
当设备上行或者心跳触发。那么我们更新key的失效时间。如果长时间掉线。那么监听到事件触发预警
? ? ?3.设置参数 notify-keyspace-events Ex
阿里云redis 设置完立马生效不需要重启
?
aws redis
默认的参数组只有common可以修改,所以如果用的不是common需要新建一个参数组覆盖原来的。这里我选择新建test 修改参数组也是立马生效不需要重启
? ? ? 4.代码实现:
redis失效监听器注册
/**
* Redis 消息监听器容器.
*
* @param redisConnectionFactory the redis connection factory
* @return the redis message listener container
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
/**
* Redis Key失效监听器注册为Bean.
*
* @param redisMessageListenerContainer the redis message listener container
* @return the redis event message listener
*/
@Bean
public RedisEventMessageListener redisEventMessageListener(RedisMessageListenerContainer redisMessageListenerContainer) {
return new RedisEventMessageListener(redisMessageListenerContainer);
}
继承事件监听
package cn.fuzhi.cloud.v2.manager.redis;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.time.LocalDateTime;
public class RedisEventMessageListener extends KeyExpirationEventMessageListener {
/**
* Creates new {@link RedisEventMessageListener} for {@code __keyevent@*__:expired} messages.
*
* @param listenerContainer must not be {@literal null}.
*/
public RedisEventMessageListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
protected void doHandleMessage(Message message) {
// 这个就是过期的key ,过期后,也就是事件触发后对应的value是拿不到的。
// 这里实现业务逻辑,如果是服务器集群的话需要使用分布式锁进行抢占执行。
String key = message.toString();
System.out.println("key = " + key);
System.out.println("end = " + LocalDateTime.now());
}
}
四、redis-cli测试notify-keyspace-events是否生效
先开一个客户端连接
redis-cli -h xxxx -p 6379 -a password
连接成功输入
PSUBSCRIBE __keyevent@*__:expired
?
出现这个就可以了先不要关闭窗口
另外开一个窗口
setex ghDelayQueue 3 3
?
出现上述输出就表示配置成功
注意一点就是这种方式:事件触发后对应的value是拿不到的,只能获取到key的内容。所以要对key的命名给出规范
最后有啥提出一起交流
|