IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> python测试开发django-197.django-celery-beat 定时任务 -> 正文阅读

[Python知识库]python测试开发django-197.django-celery-beat 定时任务

前言

django-celery-beat 可以支持定时任务,把定时任务写到数据库。
接着前面这篇写python测试开发django-196.python3.8+django2+celery5.2.7环境准备
django-celery-beat 一般结合 django-celery-results一起使用

环境准备

运行系统:linux(centos/debian/ubuntu),不支持windows
Python版本:3.8.5
Django : 2.2.2
celery: 5.2.7
django-celery-results2.4.0
django-celery-beat
2.3.0

使用pip安装celery5.2.7版本

pip install celery==5.2.7

安装django-celery-results库:

pip install django-celery-results==2.4.0

安装django-celery-beat

pip install django-celery-beat==2.3.0

定时任务配置

Django 项目中settings.py:

INSTALLED_APPS = (
    ...,
    'django_celery_results',
    'django_celery_beat',
)

将django_celery_beat模块和django-celery-results相关配置,写到setting.py

# #  RabbitMQ配置BROKER_URL 和backend
CELERY_BROKER_URL = 'amqp://admin:123456@127.0.0.1:5672//'
# # RESULT_BACKEND 结果保存数据库
CELERY_RESULT_BACKEND = 'django-db'


# # SCHEDULER 定时任务保存数据库
# 将任务调度器设为DatabaseScheduler
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# 为任务设置超时时间,单位秒。超时即中止,执行下个任务。
# CELERY_TASK_TIME_LIMIT = 5
# 为存储结果设置过期日期,默认1天过期。如果beat开启,Celery每天会自动清除。
# 设为0,存储结果永不过期
# CELERY_RESULT_EXPIRES = xx
CELERY_TASK_RESULT_EXPIRES = 60*60*24  # 后端存储的任务超过一天时,自动删除数据库中的任务数据,单位秒
CELERY_MAX_TASKS_PER_CHILD = 1000  # 每个worker执行1000次任务后,自动重启worker,防止任务占用太多内存导致内存泄漏

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TIMEZONE = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False

应用Django数据库迁移,创建相关的表

python manage.py migrate django_celery_results
python manage.py migrate django_celery_beat

django_celery_results生成3张表

django_celery_beat生成6张表

相关表的说明
django_celery_beat.models.ClockedSchedule # 此模型存放已经关闭的任务
django_celery_beat.models.CrontabSchedule # 与像在cron项领域的时间表 分钟小时日的一周 DAY_OF_MONTH month_of_year
django_celery_beat.models.IntervalSchedule # 以特定间隔(例如,每5秒)运行的计划。
django_celery_beat.models.PeriodicTask # 此模型定义要运行的单个周期性任务。
django_celery_beat.models.PeriodicTasks # 此模型仅用作索引以跟踪计划何时更改
django_celery_beat.models.SolarSchedule # 定制任务

实例参考

在views视图中,主要用到CrontabSchedule 和PeriodicTask

from django_celery_beat.models import PeriodicTask, CrontabSchedule
import json


class CreateTaskView(APIView):
    """创建任务视图"""

    def post(self, request):
        """创建任务 接口传参,示例
        {
           "task_name": "任务名称",
           "arg1": "参数1",
           "arg2": "参数2",
           "task_cron": "*/2 * * * *"
        }
        """
        task_name = request.data.get("task_name")
        task_kwargs = {
            "task_name": task_name,
            "arg1": request.data.get("arg1"),
            "arg2": request.data.get("arg2"),
        }
        # 定时任务规则
        cron_value = request.data.get("task_cron")
        cro_list = str(cron_value).split(' ')
        if len(cro_list) != 5:
            return Response({"code": 3003, "msg": "task_cron 不合法"})
        cron_time = {
            'minute': cro_list[0],  # 每2分钟执行一次
            'hour': cro_list[1],
            'day_of_week': cro_list[2],
            'day_of_month': cro_list[3],
            'month_of_year': cro_list[4]
        }
        # 写入 schedule表
        schedule = CrontabSchedule.objects.create(**cron_time)
        # 任务和 schedule 关联
        task_obj = PeriodicTask.objects.filter(name=task_name)
        if task_obj:
            return JsonResponse({"code": 3000, "msg": "task name exist"})
        
        task_obj, created = PeriodicTask.objects.get_or_create(
            name=task_name,  # 名称保持唯一
            task="app.tasks.task_demo",  # 任务的注册路径
            crontab=schedule,
            enabled=True,  # 是否开启任务
            # args=json.dumps(task_args),
            kwargs=json.dumps(task_kwargs),
            # 任务过期时间,设置当前时间往后1天
            expires=datetime.datetime.now() + datetime.timedelta(days=1)
        )
        if created:
            return JsonResponse({"code": 0, "msg": "success"})
        else:
            return JsonResponse({"code": 111, "msg": "create failed"})

其中 task="app.tasks.task_demo" 是任务的注册路径,比如django下有自己的app,在app下创建了一个tasks.py文件

# Create your tasks here
from celery import shared_task


@shared_task
def add(x, y):
    return x + y

启动worker与beat

最后启动worker与beat

celery -A proj worker -l info
celery -A proj beat -l info

linux 后台启动使用 supervisord 后台启动celery 服务(worker/beat)
相关教程参考这篇https://www.cnblogs.com/yoyoketang/p/16458925.html

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-07-20 18:47:30  更:2022-07-20 18:50:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/14 14:58:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码