IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> python——常见定时任务的错误实现 -> 正文阅读

[Python知识库]python——常见定时任务的错误实现

摘要

有些时候我们需要每隔一段时间就要执行一段程序,或者是往复循环执行某一个任务。例如:磁盘的清理工作,垃圾文件的删除,同时对于过期镜像的管理等……都是需要使用定时任务的。网上看到很多的有关于python定时任务编写都是错误的,博文将给大家展示错误的使用,同时介绍正确的python中常用定时任务实现方法,帮助大家在日常工作学习和使用。

一、可用的python定时任务

1.1 APScheduler实现定时任务

APScheduler(advanceded python scheduler)基于Quartz的一个Python定时任务框架,实现了Quartz的所有功能,使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务,并且可以持久化任务。基于这些功能,我们可以很方便的实现一个Python定时任务系统。 它有以下三个特点:

  • 类似于 Liunx Cron 的调度程序(可选的开始/结束时间)

  • 基于时间间隔的执行调度(周期性调度,可选的开始/结束时间)

  • 一次性执行任务(在设定的日期/时间运行一次任务)

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
    print(datetime.now.strftime('%Y-%m-%d %H:%M:%S'))
 
sched=BlockingScheduler()
sched.add_job(job,"interval",second=5,id="my_job")
sched.start()

1.2?利用调度模块schedule实现定时任务

schedule是一个第三方轻量级的任务调度模块,可以按照秒,分,小时,日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。

import schedule
import time

def job():
    print("i am working…………")
    
schedule.every(10).seconds.do(job)

schedule.every(10).minute.do(job)

schedule.every().hour.do(job)

schedule.every().day.at("10.30").do(job)

schedule.every().monday.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

1.3? Apache Airflow实现定时任务

1.4 Celery实现定时任务

二、错误的定时任务实现

2.1 True: + sleep()导致性能低

import datetime
import time

def time_printer():
    """
    只能设定间隔,不能指定具体的时间,比如每天早上8:00
    sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。
    :return:
    """
    now=datetime.datetime.now()
    ts=now.strftime('%Y-%m-%d %H:%M:%S')
    print(f'do fun time:{ts}')


def loop_monitor():
    while True:
        time_printer()
        time.sleep(5)

if __name__ == '__main__':
    loop_monitor();

虽然该程序可以实现,但是sleep 是一个阻塞函数,也就是说 sleep 这一段时间,程序什么也不能操作。

2.2 Timeloop() 只能运行一次

pip install TimeLoop
import time
from timeloop import Timeloop
from datetime import timedelta

tl = Timeloop()


@tl.job(interval=timedelta(seconds=2))
def sample_job_every_2s():
    print("2s job current time : {}".format(time.ctime()))


@tl.job(interval=timedelta(seconds=5))
def sample_job_every_5s():
    print("5s job current time : {}".format(time.ctime()))


@tl.job(interval=timedelta(seconds=10))
def sample_job_every_10s():
    print("10s job current time : {}".format(time.ctime()))

if __name__ == '__main__':
  sample_job_every_10s()

2.3 利用threading.Timer导致OOM错误

threading 模块中的 Timer 是一个非阻塞函数,比 sleep 稍好一点,timer最基本理解就是定时器,我们可以启动多个定时任务,这些定时器任务是异步执行,所以不存在等待顺序执行问题。 Timer(interval, function, args=[ ], kwargs={ })

  • interval: 指定的时间

  • function: 要执行的方法

  • args/kwargs: 方法的参数

import datetime
from threading import Timer

def time_printer():
    now=datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print(f'do fun time:{ts}')
    loop_monitor()

def loop_monitor():
    t=Timer(5,time_printer())
    t.start()
    

2.4 sched导致OOM错误

sched模块实现了一个通用事件调度器,在调度器类使用一个延迟函数等待特定的时间,执行任务。同时支持多线程应用程序,在每个任务执行后会立刻调用延时函数,以确保其他线程也能执行。 class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口,它需要外部传入两个参数,timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time),delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。

import time
import sched
import datetime
def time_printer():
    now=datetime.datetime.now()
    ts = now.strftime('%Y-%m-%d %H:%M:%S')
    print(f'do fun time:{ts}')
    loop_monitor()

def loop_monitor():
    # 生成调度器
    job=sched.scheduler(time.time,time.sleep)
    job.enter(5,1,time_printer(),())
    job.run()
    
if __name__ == '__main__':
    loop_monitor()

scheduler对象主要方法:

  • enter(delay, priority, action, argument),安排一个事件来延迟delay个时间单位。

  • -cancel(event):从队列中删除事件。如果事件不是当前队列中的事件,则该方法将跑出一个ValueError。

  • -run():运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数),然后执行事件,直到不再有预定的事件。

博文参考

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-05-21 18:55:28  更:2022-05-21 18:57:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 14:44:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码