Django+Apscheduler 开发定时任务模块
目录
本章节主要是对前面四个章节的拓展,前面章节的内容实际上已经实现了uwsgi多进程下Django+Apscheduler定时任务部署;
但是,弊端是在页面上只能有一条记录来执行某一个定时任务;对于有某些特殊需求可能无法支持;本章节做了一些调整;
一、回顾
上一节添加了页面的三个按钮,以及Apscheduler工具类 【Django+Apscheduler 开发定时任务模块】【一】 【Django+Apscheduler 开发定时任务模块】【二】 【Django+Apscheduler 开发定时任务模块】【三】 【Django+Apscheduler 开发定时任务模块】【四】
二、正文:添加任务可选择触发器
1、修改models.py
(1)新增model:JobListTrigger
class JobListTrigger(models.Model):
trigger_name = models.CharField('触发器名称', max_length=25)
trigger_func = models.CharField('触发器', max_length=25)
func_desc = '例:from app.test_job import test'
func_path = models.CharField('触发器路径', help_text=func_desc, max_length=255)
description = models.CharField('描述', max_length=50)
def __str__(self):
return self.trigger_name
class Meta:
verbose_name_plural = '定时任务触发器'
(2)修改定时任务的model:JobList ;只贴修改后的了:
class JobList(models.Model):
job_name = models.CharField('任务名称', max_length=25)
trigger = models.ForeignKey(JobListTrigger, on_delete=models.SET_NULL, null=True)
type_choices = (('date', 'date'), ('cron', 'cron'))
type_content = '''调度类型 对应 参数(执行频率) 例:<br/>
1、date:2019年8月30日 凌晨一点 执行任务<br/>参数值:2019-8-30 01:00:00 <br/>
2、cron:每天 凌晨两点 执行任务 (秒 分 时 日 月 星期 年)<br/>参数值:0 0 2 * * * *'''
action_type = models.CharField('调度类型', choices=type_choices, max_length=25, default='cron')
gender_choices = ((0, '停止'), (1, '启动'),)
job_state = models.IntegerField('任务状态', choices=gender_choices, default=0)
job_rate = models.CharField('执行频率', help_text=type_content, max_length=50)
time = models.IntegerField('执行次数', default=0)
def __str__(self):
return self.job_name
class Meta:
verbose_name_plural = '定时任务'
2、修改admin.py,只修改了两个按钮函数、重写了只读字段函数、新增任务触发器admin;
(1)设置修改页面的只读字段
def get_readonly_fields(self, request, obj=None):
if obj:
readonly_fields = ('job_state', 'trigger_name',)
else:
readonly_fields = ('job_state',)
return readonly_fields
(2)修改启动任务按钮:start_job,还剩省略些吧,可以去github看;
def start_job(self, request, queryset):
job_rate = rate.get('job_rate')
job_id = id.get('id')
trigger_name = JobList.objects.get(id=job_id).trigger.trigger_func
job_name = name.get('job_name')
job_type = type.get('action_type')
if hasattr(JobAction(), 'start_%s_job' % job_type):
func = getattr(JobAction(), 'start_%s_job' % job_type)
func(trigger_name, job_rate, str(job_id))
queryset.update(job_state=1)
else:
return messages.error(request, '【启动错误】 ' + job_name + ' 任务不存在!')
(3)修改启动任务按钮:stop_job,这个函数贴的很全;
def stop_job(self, request, queryset):
global job_name
name_list = queryset.values('job_name')
trigger_list = queryset.values('trigger')
id_list = queryset.values('id')
try:
for (trigger, id, name) in zip(trigger_list, id_list, name_list):
job_id = id.get('id')
trigger_name = JobList.objects.get(id=job_id).trigger.trigger_func
job_name = name.get('job_name')
JobAction.stop_job(trigger_name + '-' + str(job_id))
queryset.update(job_state=0)
return messages.success(request, (name_list[0].get('job_name') if len(name_list) == 1 else '所选') + " 任务已停止")
except JobLookupError as e:
logger.error(e)
return messages.error(request, job_name + " 任务不存在!")
(4)新增任务触发器admin:JobListTriggerAdmin
@admin.register(JobListTrigger)
class JobListTriggerAdmin(admin.ModelAdmin):
list_display = ('id', 'trigger_name', 'trigger_func', 'func_path', 'description')
list_display_links = ('id', 'trigger_name')
list_per_page = 20
list_max_show_all = 7
ordering = ('id',)
3、修改job_new.py,,这个文件中的所有关于定时任务的import就可以全部删除了
(1)修改start_date_job
@staticmethod
def start_date_job(trigger, job_rate, id):
exec(JobList.objects.get(id=id).trigger.func_path)
trigger_id = trigger + '-' + id
scheduler.add_job(eval(trigger), 'date', run_date=job_rate, id=trigger_id, args=[trigger_id, id, 'date'])
logger.info("%s start successfully" % trigger)
logger.info('任务池:' + str(scheduler.get_jobs))
(2)修改start_cron_job
@staticmethod
def start_cron_job(trigger, job_rate, id):
exec(JobList.objects.get(id=id).trigger.func_path)
rate = job_rate.split()
trigger_id = trigger + '-' + id
scheduler.add_job(eval(trigger), 'cron', second=rate[0], minute=rate[1], =rate[2], day=rate[3],
month=rate[4], day_of_week=rate[5], year=rate[6], id=trigger_id,
args=[trigger_id, id, 'cron'])
logger.info("%s start successfully" % trigger)
logger.info('任务池:' + str(scheduler.get_jobs))
3、wsgi.py
(1)这里只有一处修改
func(job_.trigger_name, job_.job_rate, str(job_.id))
logger.error('【启动错误】 ' + job_.trigger_name + '任务不存在!')
func(job_.trigger.trigger_func, job_.job_rate, str(job_.id))
logger.error('【启动错误】 ' + job_.trigger.trigger_func + '任务不存在!')
三、 截图看效果
四、结尾
问题:触发器列表大有问题,按照我的逻辑来说,应该在启动项目时扫描那个函数用到了job_before装饰器,这样就有了触发器的路径;现在这样手动添加的话,非开发人员有点看不懂 想明白了再来改
执行次数暂时还没用到,以后想好了再来写
ok,本章节就到这里了!
以上就是全部修改,也有可能没写全,还是去看源码比较好!私信我
希望有大佬提出问题
2022-07-30
|