IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> 【Python】Django-动态创建定时任务 -> 正文阅读

[Python知识库]【Python】Django-动态创建定时任务

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


前言

提示:这里可以添加本文要记录的大概内容:

在使用python的Django框架时,可以使用celery工具来定性定时任务的创建。可以帮助我们实现很多计划性任务。
某些情况下,我们会想要需要根据我们的函数传参实现动态生成定时任务,这是我们就可以使用第三方库django-celery-beat、django-celery-results来实现。示例以来Python的Django框架。


提示:以下是本篇文章正文内容,下面案例可供参考

一、定时任务原理

在我们使用django-celery-beat创建任务时,在启用celery-worker时(python manage.py celery worker -l info) 会在我们的数据库中生成记录信息。记录任务的时间与函数方法。
例如:
在这里插入图片描述
在创建任务完成后,celery会遍历数据表内容,django_celery_beat_periodictask记录创建的任务信息,django_celery_beat_crontabschedule、django_celery_beat_intervalschedule记录任务时间维度。

二、使用步骤

1.安装库

初始化框架时,将celery信息存入requirements.txt

Django==2.2.6
celery==4.4.0
django-celery-beat==2.0.0
django-celery-results==1.2.1

运行 pip install -r requirements.txt
进行数据库迁移
python manage.py migrate

2.创建任务

代码如下(示例):

import logging
from celery.task import periodic_task
from celery.schedules import crontab
from django_celery_beat.models import PeriodicTask,IntervalSchedule,CrontabSchedule
logger = logging.getLogger('root')

@periodic_task(run_every=crontab(minute='0', hour='0'))   # 每天凌晨开始执行定时任务
def operation_strategy_bkjob():
	# work_time 自定-此示例用来指明任务时间 
    work_time = [["04:00:00","04:00:00"],["20:00:00","20:00:00"],["11:10:00","16:00:00"]]
    for time_ in work_time:
        enter_time = time_[0]
        enter_time_hour = int(enter_time.split('-')[0])
        enter_time_minute = int(enter_time.split('-')[1])
        end_time = time_[-1]
        end_time_hour = int(end_time.split('-')[0])
        end_time_minute = int(end_time.split('-')[-1])
        # 创建当天的定时任务(开始)
        bool_enter_task,enter_task_name = enter_task(enter_time_hour,enter_time_minute,task_info='homen_application.views.operation_task',strategy_id=strategy_id,enabled=True,strategy_name=strategy_object.name)
        if bool_enter_task == False:
            logger.error("<创建任务失败>[}".format(enter_task_name))
        # 创建当天的定时任务(结束)
        bool_stop_task,stop_task_name = enter_task(end_time_hour,end_time_minute,task_info='homen_application.views.operation_task',strategy_id=strategy_id,enabled=False,strategy_name=strategy_object.name)
        if bool_stop_task == False:
            logger.error("<停止任务失败>[}".format(stop_task_name))
	logger.info('<---{}:创建作业平台当天定时任务完成--->'.format(today_str))


def enter_task(hour_,minute_,task_info,strategy_id,strategy_name, enabled,**kwargs):
    """创建定时任务"""
    try:
        task_name= strategy_name + "-" + hour_ + "-" + minute_
        # 定时任务的时间
        crontab, _ = CrontabSchedule.objects.update_or_create(
            minute=minute_,
            hour=hour_,
            day_of_week="*",
            day_of_month='*',
            month_of_year='*',
        )
        # 定时任务信息
        PeriodicTask.objects.update_or_create(
            defaults={
            "crontab": crontab,  # 使用的时间维度,即什么时间执行这个任务
            "task": task_info,  # 任务用到的函数方法
            # "args":json.dumps([strategy_id, enabled]),
            "kwargs": json.dumps({"strategy_id":strategy_id, "enabled":enabled}, ensure_ascii=False) # 传入的参数
            },
            name=task_name, # 任务名称
        )
        return True,task_name
    except Exception as e:
        logger.error('<---创建定时任务失败--->{}'.format(e))
        return False,str(e)

该代码片段可实现在每天00:00:00进行任务创建,任务执行时间有参数work_time控制,任务使用方法的参数由 enter_task 中args与kwargs控制。

3.停止、删除任务操作

def stop_task(hour_,minute_,task_info,is_delete,**kwargs):
    """终止定时任务"""
    try:
        # 定位任务
        tasks = PeriodicTask.objects.filter(name=task_info)
        # 停止任务
        if tasks:
            tasks.update(enabled=False)
            # 删除任务
            if is_delete:
                tasks.delete()
        return True,'success'
    except Exception as e:
        return False,str(e)

总结

以上为对第三方库django-celery-beat2.0.0
django-celery-results
1.2.1的一个简单实用记录。

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-10-22 21:11:19  更:2022-10-22 21:15:04 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/26 2:46:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计