redis的zset是有序集合,默认根据score升序排序。并且可以根据scope范围查询,因此可以启动一个线程循环执行范围查询,获取当前时间之前的数据,即要执行任务,(因为不是严格按照时间匹配的,因此可能会有一点时间偏差,但一般情况下不会有影响),处理完后删除缓存。考虑到线程有可能会异常退出(比如redis连接异常等),因此使用监听者模式设计了线程重启方案,监听者会监听线程,当线程出现异常时监听者会重启线程。下面是具体代码。
@Configuration
public class RedisTemplateConfig {
@Bean(name = "myredis")
public RedisTemplate redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(factory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
redisTemplate.setValueSerializer(genericFastJsonRedisSerializer);
redisTemplate.setHashValueSerializer(genericFastJsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
public class DelayTaskConsumerThread extends Observable implements Runnable {
@Override
public void run() {
System.out.println("线程启动:"+Thread.currentThread().getId());
while (true){
try {
long time = System.currentTimeMillis();
consumeMsg("testDelayTask",time);
} catch (Exception e) {
e.printStackTrace();
System.out.println("线程异常:"+Thread.currentThread().getId());
doBusiness();
break;
}
}
}
private void consumeMsg(String key,long time){
RedisTemplate redisTemplate = (RedisTemplate) SpringContextUtil.getBean("myredis");
Set set = redisTemplate.opsForZSet().rangeByScore(key,0l, time);
if (set.size() > 0){
System.out.println("线程:"+Thread.currentThread().getId()+","+"获取到["+set.size()+"]条数据,[time="+time+"]");
Iterator iterator = set.iterator();
String next="";
while (iterator.hasNext()){
try {
next = (String) iterator.next();
System.out.println("线程:"+Thread.currentThread().getId()+","+"消费消息:[value="+ next +"]");
}catch (Exception e){
throw e;
}finally {
redisTemplate.opsForZSet().remove(key,next);
}
}
}else {
System.out.println("线程:"+Thread.currentThread().getId()+","+"没有消息[time="+time+"]");
}
}
public void doBusiness(){
if(true){
super.setChanged();
}
notifyObservers();
}
}
public class DelayTaskConsumerListener implements Observer {
@Override
public void update(Observable o, Object arg) {
System.out.println("DelayTaskConsumerThread线程异常退出,重新启动");
DelayTaskConsumerThread run = new DelayTaskConsumerThread();
run.addObserver(this);
ThreadPoolUtil.execute(run);
System.out.println("DelayTaskConsumerThread线程已重启");
}
}
@Component
public class DelayTaskConsumerRunner implements ApplicationRunner {
@Autowired
@Qualifier(value = "myredis")
private RedisTemplate redisTemplate;
@Override
public void run(ApplicationArguments args) {
redisTemplate.delete("testDelayTask");
DelayTaskConsumerThread taskConsumerThread = new DelayTaskConsumerThread();
DelayTaskConsumerListener taskConsumerListener = new DelayTaskConsumerListener();
taskConsumerThread.addObserver(taskConsumerListener);
ThreadPoolUtil.execute(taskConsumerThread);
}
}
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static <T> T getBean(Class<T> clazz) {
return getApplicationContext().getBean(clazz);
}
public static <T> T getBean(String name, Class<T> clazz) {
return getApplicationContext().getBean(name, clazz);
}
}
@GetMapping(value = "/test", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public CommonResponse test() {
produceMsg("testDelayTask", UUID.randomUUID().toString(),System.currentTimeMillis());
return new CommonResponse();
}
private void produceMsg(String key,String value,long score){
Boolean add = redisTemplate.opsForZSet().add(key, value, score);
if (add){
System.out.println("发送消息:[value="+value+",score="+score+"]");
}else {
System.out.println("消息发送失败:[value="+value+",score="+score+"]");
}
}
|