| |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| -> Python知识库 -> python APScheduler模块 -> 正文阅读 |
|
|
[Python知识库]python APScheduler模块 |
简介一般来说?
$pip install apscheduler APScheduler的各个组件的关系, 如下图:
一般使用步骤:
除此之外, 可以监听事件, 执行自定义的函数 import datetime
from pytz import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.jobstores.memory import MemoryJobStore
job_stores = {
'default': MemoryJobStore()
}
executors = {
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
def hello_world():
print("hello world")
# 阻塞调度器
scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
# 在当前时间的3秒后, 触发执行hello_world, 详情见: "触发器与调度器API"
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
timezone=timezone("Asia/Shanghai"))
scheduler.start()
调度器配置作业存储器和执行器可以在调度器中完成。例如添加、修改、移除作业,根据不同的应用场景,可以选择不同的调度器,可选择的调度器如下: # 阻塞式调度器 [ 调度器是你程序中唯一要运行的东西 ] from apscheduler.schedulers.blocking import BlockingScheduler # 后台调度器 [ 应用程序后台静默运行 ] from apscheduler.schedulers.background import BackgroundScheduler # AsyncIO调度器 [ 如果你的程序使用了 asyncio 库 ] from apscheduler.schedulers.asyncio import AsyncIOScheduler # Gevent调度器 [ 如果你的程序使用了 gevent 库 ] from apscheduler.schedulers.gevent import GeventScheduler # Tornado调度器 [ 如果你打算构建一个 Tornado 程序 ] from apscheduler.schedulers.tornado import TornadoScheduler # Twisted调度器 [ 如果你打算构建一个 Twisted 程序 ] from apscheduler.schedulers.twisted import TwistedScheduler # Qt调度器 [ 如果你打算构建一个 Qt 程序 ] from apscheduler.schedulers.qt import QtScheduler 在使用非阻塞的调度器时需要注意:?程序是否会退出从而无法执行任务 配置有3种方式配置 方式一 from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 键为名称
# 值为字典 实例化对象作为值, 参数直接在实例化时传入
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors,
job_defaults=job_defaults, timezone=utc)
方式二 from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
# 键为名称,值要为字典,type指定调度器, 其它键值对指定参数
jobstores = {
'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': {'type': 'threadpool', 'max_workers': 20},
'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方式三 from apscheduler.schedulers.background import BackgroundScheduler
# 前缀 "apscheduler." 是硬编码的
# apscheduler.jobstores指定任务存储器
# apscheduler.executors指定执行器
# 最后的 "." 指定名称
scheduler = BackgroundScheduler({
'apscheduler.jobstores.mongo': {
'type': 'mongodb'
},
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///jobs.sqlite'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.executors.processpool': {
'type': 'processpool',
'max_workers': '5'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'UTC',
})
执行器处理作业的运行,通常通过在作业中提交指定的可调用对象到一个线程或者进程池来进行,当作业完成时,执行器会将通知调度器 步骤:
# 线程池执行器 from apscheduler.executors.pool import ThreadPoolExecutor # 进程池执行器 from apscheduler.executors.pool import ProcessPoolExecutor # AsyncIO事件循环执行器 from apscheduler.executors.asyncio import AsyncIOExecutor # Gevent事件循环执行器 from apscheduler.executors.gevent import GeventExecutor # Tornado事件循环执行器 from apscheduler.executors.tornado import TornadoExecutor 默认ThreadPoolExecutor
使用例子 import datetime
from pytz import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
executors = {
'pool': ThreadPoolExecutor(max_workers=5)
}
def hello_world():
print("hello world")
scheduler = BlockingScheduler()
# 添加到配置文件
scheduler.configure(executors=executors)
# 指定执行器
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
timezone=timezone("Asia/Shanghai"), executor="pool")
scheduler.start()
任务存储器存储被调度的作业,默认的作业存储器只是简单地把作业保存在内存中,其他的作业存储器则是将作业保存在数据库中,当作业被保存在一个持久化的作业存储器中的时候,该作业的数据会被序列化,并在加载时被反序列化,需要说明的是,?作业存储器不能共享调度器?。 步骤:
# 内存任务存储器 from apscheduler.jobstores.memory import MemoryJobStore # 使用SQLAlchemy ORM的任务存储器 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore # MongoDB任务存储器 from apscheduler.jobstores.mongodb import MongoDBJobStore # Redis任务存储器 from apscheduler.jobstores.redis import RedisJobStore # RethinkDB任务存储器 from apscheduler.jobstores.rethinkdb import RethinkDBJobStore # ZooKeeper任务存储器 from apscheduler.jobstores.zookeeper import ZooKeeperJobStore 默认MemoryJobStore 一般使用 import datetime
from pytz import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
job_stores = {
'redis': RedisJobStore()
}
def hello_world():
print("hello world")
scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores)
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
timezone=timezone("Asia/Shanghai"), jobstore="redis")
scheduler.start()
SQLAlchemyJobStore使用 sqlalchemy + mysql """
SQLAlchemyJobStore(url=None, engine=None, tablename='apscheduler_jobs',
metadata=None, ..., tableschema=None, engine_options=None):
指定URL时,内部调用,create_engine
URL的字符串形式为 dialect[+driver]://user:password@host/dbname[?key=value..]
在哪里 dialect 是数据库名称,例如 mysql , oracle , postgresql 等,
以及 driver DBAPI的名称,例如 psycopg2 , pyodbc , cx_oracle 或者
# 使用DB API格式建立建立连接, 见PEP: https://www.python.org/dev/peps/pep-0249/
"""
import datetime
from pytz import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
MYSQL = {
"url": "mysql+pymysql://root:123456@localhost/test"
}
job_stores = {
'mysql': SQLAlchemyJobStore(**MYSQL)
}
def hello_world():
print("hello world")
scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores)
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
timezone=timezone("Asia/Shanghai"), jobstore="mysql")
scheduler.start()
RedisJobStore使用 """
RedisJobStore(db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', ..., **connect_args)
调用 Redis(db=int(db), **connect_args)
Redis的参数:
host='localhost', port=6379,
db=0, password=None, socket_timeout=None,
socket_connect_timeout=None,
socket_keepalive=None, socket_keepalive_options=None,
connection_pool=None, unix_socket_path=None,
encoding='utf-8', encoding_errors='strict',
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
ssl_cert_reqs='required', ssl_ca_certs=None,
ssl_check_hostname=False,
max_connections=None, single_connection_client=False,
health_check_interval=0, client_name=None, username=None
"""
import datetime
from pytz import timezone
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.redis import RedisJobStore
REDIS = {
'host': '127.0.0.1',
'port': '6379',
'db': 0,
}
job_stores = {
'redis': RedisJobStore(**REDIS)
}
def hello_world():
print("hello world")
scheduler = BlockingScheduler()
scheduler.configure(jobstores=job_stores)
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
timezone=timezone("Asia/Shanghai"), jobstore="redis")
scheduler.start()
其它自己查资料 全局配置from apscheduler.schedulers.blocking import BlockingScheduler
job_defaults = {
'coalesce': False, # 关闭聚合(coalescing)功能
'max_instances': 3, # 默认限制最大实例数为 3
"timezone": "UTC", # 调度器的时区
}
scheduler = BlockingScheduler()
scheduler.configure(job_defaults=job_defaults)
关于coalescing, 见:错过的作业执行以及合并操作 调度器API以下方法为调度器的API 添加任务使用? 例如: # ....
def hello_world():
print("hello_world")
scheduler = BlockingScheduler()
scheduler.add_job(hello_world, ...)
# ....
# ############## 或
# ...
scheduler = BlockingScheduler()
@scheduler.scheduled_job(...)
def hello_world():
print("hello_world")
# ...
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args)
触发器为空时, 立即执行 例子 import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def hello_world():
print("hello_world")
scheduler = BlockingScheduler()
scheduler.add_job(hello_world, "date", run_date=datetime.datetime.now() + datetime.timedelta(seconds=3),
timezone="Asia/Shanghai")
scheduler.start()
触发器 触发器中包含调度逻辑,每个作业都有自己的触发器来决定下次运行时间。除了它们自己初始配置以外,触发器完全是无状态的。
移除任务当从 scheduler 中移除一个 job 时,它会从关联的 job store 中被移除,不再被执行。 两种方法: job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
# 或使用ID
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')
修改任务例子: job = scheduler.add_job(myfunc, 'interval', minutes=2, id="my_job_id")
job.modify(args=["lczmx", ]max_instances=6, name='Alternate name')
# 根据ID修改
scheduler.modify_job("my_job_id", args=["lczmx", ])
# 重新调度
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
暂停或恢复任务通过 Job 实例或者 scheduler 本身你可以轻易地暂停和恢复 job 。当一个 job 被暂停,它的下一次运行时间将会被清空,同时不再计算之后的运行时间,直到这个 job 被恢复。 from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print("Hello world")
scheduler = BlockingScheduler()
job = scheduler.add_job(job_function, "interval", seconds=2, timezone="Asia/shanghai", id="my_job_id")
# ################# 暂停 ###########
job.pause()
# 或
scheduler.pause_job("my_job_id")
# ################# 恢复 ###########
job.resume()
# 或
scheduler.resume_job("my_job_id")
scheduler.start()
查看任务信息from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print("Hello world")
scheduler = BlockingScheduler()
job = scheduler.add_job(job_function, "interval", seconds=2, timezone="Asia/shanghai", id="my_job_id")
# 获取某个任务的信息, 需要id, 可以指定job store
print(scheduler.get_job("my_job_id"))
# 获取全部任务信息列表, 可以指定job store
print(scheduler.get_jobs())
# 格式化输出任务信息, 可以指定job store
# !! 内部调用print
scheduler.print_jobs()
scheduler.start()
终止调度器# 一般使用 # 默认会等待 目前 正在执行 所有任务执行完 scheduler.shutdown() # 使用wait参数指定不等待 scheduler.shutdown(wait=False) 暂停/恢复调度器from apscheduler.schedulers.blocking import BlockingScheduler
def job_function():
print("Hello world")
scheduler = BlockingScheduler()
job = scheduler.add_job(job_function, "interval", seconds=2, timezone="Asia/shanghai", id="my_job_id")
# 休眠这个调度器
scheduler.pause()
# 恢复这个调度器
scheduler.resume()
# 使用 .start , 唤醒处于暂停状态的调度器
scheduler.start(paused=True)
scheduler.start()
添加事件你可以为 scheduler 绑定事件监听器(event listen)。Scheduler 事件在某些情况下会被触发,而且它可能携带有关特定事件的细节信息。 使用?
所有事件有如下表
与调度器相关事件:? code alias 与任务相关事件:? code job_id jobstore 向执行器提交任务的相关事件:? code job_id jobstore scheduled_run_times 任务执行在执行器的相关事件:? code job_id jobstore scheduled_run_time retval exception traceback 例子: from apscheduler.schedulers.blocking import Blockin gScheduler
from apscheduler.events import *
from apscheduler.events import SchedulerEvent
def my_listener(event):
if event.exception:
print('发生异常')
else:
print('任务已经执行')
def job_function():
print("Hello world")
scheduler = BlockingScheduler()
# 立即执行
job = scheduler.add_job(job_function, timezone="Asia/shanghai")
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()
故障排查如果 scheduler 没有如预期般正常运行,可以尝试将? 如果你还没有在一开始就将日志启用起来,那么你可以: import logging
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)
这会提供 scheduler 运行时大量的有用信息。 最大允许实例默认情况下,每个任务同时只会有一个实例在运行。这意味着如果 一个任务到达计划运行时间点时,前一个任务尚未完成,那么这个 任务最近的一次运行计划将会 misfire(错过)。 可以通过在?添加任务?时指定? 错过的作业执行以及合并操作即: coalescing 有时候? 常见的原因是: 这个 任务是在持久化的? 这样,这个 任务 就被定义为 misfired (错过)。? 如果这个行为不符合你的实际需要,可以使用? 注意: 如果因为进程(线程)池中没有可用的进程(线程)而导致 任务的运行被推迟了,那么 执行器 会直接跳过它,因为相对于原计划的执行时间来说实在太 "晚" 了。 如果在你的应用程序中出现了这种情况,你可以增加 执行器的线程(进程)的数目,或者调整?
? ? |
|
|
|
|
| 上一篇文章 下一篇文章 查看所有文章 |
|
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
| 360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年11日历 | -2025/11/27 7:23:29- |
|
| 网站联系: qq:121756557 email:121756557@qq.com IT数码 |