Django+Apscheduler 开发定时任务模块
目录
本文章要实现的是Django+Apscheduler 开发定时任务模块,并使用uwsgi+nginx模式部署,
且能避免多进程下任务重复执行,且能够页面动态操作任务;
一、回顾
上一节添加了页面的三个按钮,以及Apscheduler工具类 【Django+Apscheduler 开发定时任务模块】【一】 【Django+Apscheduler 开发定时任务模块】【二】 【Django+Apscheduler 开发定时任务模块】【三】
二、定义定时任务函数和装饰器
1、任务函数很简单,一个普通函数就好
def test(*args):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now + '__这是一个测试的定时任务:' + args[0])
2、使用redis前简单说一下思路,先贴一下uwsgi的配置:
<uwsgi>
<socket>127.0.0.1:8997</socket>
<chdir>/Users/PycharmProjects/autojob/</chdir>
<module>autojob.wsgi</module>
<processes>4</processes>
<daemonize>uwsgi.log</daemonize>
<enable-threads>True</enable-threads>
<preload>True</preload>
<lazy-apps>True</lazy-apps>
</uwsgi>
可以看到,uwsgi的配置中进程数写了4,这种情况下,如果正常启动我们的项目并且同时启动定时任务,那么每个启动的任务都会在同时执行4次;
也就是说,每个线程中都存在定时任务对象,我们要解决的也正是这个问题。
若是将进程数改为1,这个问题解决了,又迎来新的问题,只有一个进程,项目遇到高并发或者阻塞请求的时候还能够正常运行吗!
我想到的解决方案是使用redis锁来实现多进程下的Apscheduler定时任务,讲一下思路再继续:
项目启动时同时启动定时任务,还是4个进程;若是有状态为启动的任务,就将任务添加到池子中,也就是每个进程中都有一个池子。
假设有一个任务test为启动状态,四个进程都有这个任务,频率为每分钟执行一次,相当于四个相同的任务,运行时都会调同一个函数。
我们写个装饰器,在调用函数之前根据任务触发器和id添加redis锁,抢到锁的进程可以继续执行,没有抢到的进程中将该任务从任务池中删掉,只保留一个,若有多个任务同理;
到这里还是有问题的,后面再说
3、给任务函数添加装饰器 job_before:
def job_before(func):
def wrapper(*args, **kwargs):
from app.job_new import JobAction
job_state = job_list.objects.filter(id=int(args[1])).values('job_state')[0]['job_state']
if job_state == 1:
job_value = cache.get(args[0])
if job_value:
JobAction.modify_job(args[0], job_value)
cache.delete(args[0])
if get_lock(args[0]):
if args[2] == 'date' or (job_value and job_value[0] == 'date'):
job_list.objects.filter(id=int(args[1])).update(job_state=0)
func(*args, **kwargs)
else:
logger.info('该线程未取到锁,停止该任务!')
JobAction.stop_job(args[0])
else:
JobAction.stop_job(args[0])
return wrapper
def get_lock(lock_name):
lock = cache.lock('lock.' + lock_name, timeout=5)
return lock.acquire(blocking=False)
解释一下这个装饰器,
每次运行任务时都要先查看该任务在数据库的状态,若是停止状态那就不执行,直接从池中删除;
(因为4个进程,不可能每次操作都能准确的使用到有该任务的进程,若在该进程就直接停止了,若不在会在下次执行的时间节点停止;)
若是运行状态还是要检查redis是否有该任务的修改策略,有的话先修改池中任务的策略,并且执行本次任务;
(这里可以根据个人情况修改;若想尽快执行任务,也可以直接将该条任务置为停止,在添加一条数据使用该任务触发器;)
date类型的任务执行完成后默认就不会再执行了,所以这了写了date类型任务直接将数据改为了停止状态,并且运行本次任务;
锁的超时时间根据个人情况修改
4、修改任务函数
给函数添加装饰器
@job_before
def test(*args):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now + '__这是一个测试的定时任务:' + args[0])
整个项目大概就这么多了,几乎所有的代码都在这了,写的很粗糙,大家自己拼一拼吧。 我还要再整理一下,有一些改进的思路,整体其实有很大问题的,也欢迎大佬们指出来。 后期项目代码会放到github上开源,有需要的小伙伴也可以私信我
2022-05-03
|