前言
django-celery-beat 可以支持定时任务,把定时任务写到数据库。 接着前面这篇写python测试开发django-196.python3.8+django2+celery5.2.7环境准备 django-celery-beat 一般结合 django-celery-results一起使用
环境准备
运行系统:linux(centos/debian/ubuntu),不支持windows Python版本:3.8.5 Django : 2.2.2 celery: 5.2.7 django-celery-results2.4.0 django-celery-beat2.3.0
使用pip安装celery5.2.7版本
pip install celery==5.2.7
安装django-celery-results库:
pip install django-celery-results==2.4.0
安装django-celery-beat
pip install django-celery-beat==2.3.0
定时任务配置
Django 项目中settings.py:
INSTALLED_APPS = (
...,
'django_celery_results',
'django_celery_beat',
)
将django_celery_beat模块和django-celery-results相关配置,写到setting.py
# # RabbitMQ配置BROKER_URL 和backend
CELERY_BROKER_URL = 'amqp://admin:123456@127.0.0.1:5672//'
# # RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'django-db'
# # SCHEDULER 定时任务保存数据库
# 将任务调度器设为DatabaseScheduler
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
# CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
# CELERY_RESULT_EXPIRES = xx
CELERY_TASK_RESULT_EXPIRES = 60*60*24 # 后端存储的任务超过一天时,自动删除数据库中的任务数据,单位秒
CELERY_MAX_TASKS_PER_CHILD = 1000 # 每个worker执行1000次任务后,自动重启worker,防止任务占用太多内存导致内存泄漏
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TIMEZONE = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False
应用Django数据库迁移,创建相关的表
python manage.py migrate django_celery_results
python manage.py migrate django_celery_beat
django_celery_results生成3张表
django_celery_beat生成6张表
相关表的说明 django_celery_beat.models.ClockedSchedule # 此模型存放已经关闭的任务 django_celery_beat.models.CrontabSchedule # 与像在cron项领域的时间表 分钟小时日的一周 DAY_OF_MONTH month_of_year django_celery_beat.models.IntervalSchedule # 以特定间隔(例如,每5秒)运行的计划。 django_celery_beat.models.PeriodicTask # 此模型定义要运行的单个周期性任务。 django_celery_beat.models.PeriodicTasks # 此模型仅用作索引以跟踪计划何时更改 django_celery_beat.models.SolarSchedule # 定制任务
实例参考
在views视图中,主要用到CrontabSchedule 和PeriodicTask
from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json
class CreateTaskView(APIView):
"""创建任务视图"""
def post(self, request):
"""创建任务 接口传参,示例
{
"task_name": "任务名称",
"arg1": "参数1",
"arg2": "参数2",
"task_cron": "*/2 * * * *"
}
"""
task_name = request.data.get("task_name")
task_kwargs = {
"task_name": task_name,
"arg1": request.data.get("arg1"),
"arg2": request.data.get("arg2"),
}
# 定时任务规则
cron_value = request.data.get("task_cron")
cro_list = str(cron_value).split(' ')
if len(cro_list) != 5:
return Response({"code": 3003, "msg": "task_cron 不合法"})
cron_time = {
'minute': cro_list[0], # 每2分钟执行一次
'hour': cro_list[1],
'day_of_week': cro_list[2],
'day_of_month': cro_list[3],
'month_of_year': cro_list[4]
}
# 写入 schedule表
schedule = CrontabSchedule.objects.create(**cron_time)
# 任务和 schedule 关联
task_obj = PeriodicTask.objects.filter(name=task_name)
if task_obj:
return JsonResponse({"code": 3000, "msg": "task name exist"})
task_obj, created = PeriodicTask.objects.get_or_create(
name=task_name, # 名称保持唯一
task="app.tasks.task_demo", # 任务的注册路径
crontab=schedule,
enabled=True, # 是否开启任务
# args=json.dumps(task_args),
kwargs=json.dumps(task_kwargs),
# 任务过期时间,设置当前时间往后1天
expires=datetime.datetime.now() + datetime.timedelta(days=1)
)
if created:
return JsonResponse({"code": 0, "msg": "success"})
else:
return JsonResponse({"code": 111, "msg": "create failed"})
其中 task="app.tasks.task_demo" 是任务的注册路径,比如django下有自己的app,在app下创建了一个tasks.py文件
# Create your tasks here
from celery import shared_task
@shared_task
def add(x, y):
return x + y
启动worker与beat
最后启动worker与beat
celery -A proj worker -l info
celery -A proj beat -l info
linux 后台启动使用 supervisord 后台启动celery 服务(worker/beat) 相关教程参考这篇https://www.cnblogs.com/yoyoketang/p/16458925.html
|