IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> celery分布式任务队列从入门到精通 -> 正文阅读

[大数据]celery分布式任务队列从入门到精通

目录

1. Celery简介

2. 安装Celery

3. 安装RabbitMQ或Redis

3.1 安装redis

3.2 安装RabbitMQ

4. 第一个Celery程序

5. 第一个Celery工程项目

6. Celery 框架

7. celery 路由任务分配队列

8. Celery Beat自动任务调度

9. Celery 远程调用


1. Celery简介

? ? Celery是由纯python编写的,但是协议可以用任何语言实现。目前,已有Ruby实现的RCelery、Node.js实现的node-celery及一个PHP客户端,语言互通也可以通过using webhooks实现。在使用Celery之前,我们先来了解以下几个概念:

? ? 任务队列:简单来说,任务队列就是存放着任务的队列,客户端将要执行的任务的消息放入任务队列中,执行节点worker进行持续监视队列,如果有新任务,就取出来执行该任务。这种机制就像生产者消、费者模型一样,客户端作为生产者,执行节worker点作为消费者,它们之间通过任务队列进行传递,如图:

? ? ?中间人(broker):Celery用于消息通信,通常使用中间人(broker)在客户端和worker之间传递,这个过程从客户端(生产者)向队列添加消息开始,之后中间人把消息派送给worker(消费者)。官方给出的实现broker的工具如下:

名称状态监视远程控制
RabbitMQ稳定
Redis稳定
MongoDB实验性
Beanstalk实验性
AmazonSQS实验性
Zookeeper实验性
DjangoDB实验性
SQLAlchemy实验性
CouchDB实验性
Iron MQ第三方

????????提示: 在实际的使用中,推荐使用RabbitMQ或者Redis作为broker

  • ? ? ? ? 任务生产者:调用Celery提供的API、函数、装饰器产生任务并交给任务队列的都是任务生产者。
  • ? ? ? ? 执行单元worker:属于任务队列的消费者,持续地监控任务队列,当队列中有新的任务时,便取出来执行。
  • ? ? ? ? 任务结果存储backend:用来存储worker执行任务的结果,Celery支持不同形式的存储任务结果,包含Redis,MongoDB等。
  • ? ? ? ? 任务调度器beat:Celery Beat进程会读取配置文件的内容,周期性地将配置中需要到期执行的任务发送到任务队列执行。

Celery的特性:

  • ? ? ? ? 高可用:如果连接丢失或失败,worker和客户端就会自动重试,并且中间人broker通过主/主,主/从方式来提高可用性。
  • ? ? ? ? 快速:单个Celery进程每分钟执行数以百万计的任务,且保持往返延迟在亚毫秒级,可以选择多进程、Gevernt等并发执行。
  • ? ? ? ? 灵活:Celery几乎所有模块都可以扩展或单独使用。可以自制连接池,日志、调度器、消费者、生产者等等。
  • ? ? ? ? 框架集成:Celery易于和web框架集成,如django-celery,web2py-celery、tornado-celery等等。
  • ? ? ? ? 强大的调度功能:Celery Beat进程来实现强大的调度功能,可以指定任务在若干秒后或一个时间点来执行,也可以基于单纯的时间间隔或支持分钟、小时、每周的第几天、每月的第几天等等,用crontab表达式来使用周期任务调度。
  • ? ? ? ? 易监控:可以方便地查看定时任务的执行情况,如执行是否成功,当前状态、完成任务花费时间等,还可以使用功能完备的管理后台或命令行添加、更新、删除任务,提供了完善的错误处理机制。

2. 安装Celery

?推荐使用pip安装Celery,方式如下:

pip3 install celery

# 或者,该方式安装celery时,捆绑了一组特性依赖:librabbitmq,redis,auth,msgpack
pip3 install celery[librabbitmq,redis,auth,msgpack]

以下是可用的捆绑,供使用时做参考

序列化

celery[auth]:用于使用auth安全序列化程序。

celery[msgpack]:用于使用 msgpack 序列化程序。

celery[yaml]:用于使用 yaml 序列化程序。

并发

celery[eventlet]:用于使用eventlet池。

celery[gevent]:使用gevent池。

传输和后端

celery[librabbitmq]:用于使用 librabbitmq C 库。

celery[redis]:使用 Redis 作为消息传输或结果后端。

celery[sqs]:使用 Amazon SQS 作为消息传输(实验性)。

celery[tblib]:用于使用该task_remote_tracebacks功能。

celery[memcache]:使用 Memcached 作为结果后端(使用pylibmc

celery[pymemcache]:使用 Memcached 作为结果后端(纯 Python 实现)。

celery[cassandra]:使用 Apache Cassandra 作为 DataStax 驱动程序的后端。

celery[couchbase]:使用 Couchbase 作为结果后端。

celery[arangodb]:使用 ArangoDB 作为结果后端。

celery[elasticsearch]:使用 Elasticsearch 作为结果后端。

celery[riak]:使用 Riak 作为结果后端。

celery[dynamodb]:使用 AWS DynamoDB 作为结果后端。

celery[zookeeper]:使用 Zookeeper 作为消息传输。

celery[sqlalchemy]:使用 SQLAlchemy 作为结果后端(支持)。

celery[consul]:使用 Consul.io 键/值存储作为消息传输或结果后端(实验性)。

celery[django]:指定 Django 支持可能的最低版本。

使用源代码安装如下:(celery · PyPI

# 下载源代码文件
wget https://files.pythonhosted.org/packages/66/60/2713f5be1906b81d40f823f4c30f095f7b97b9ccf3627abe1c79b1e2fd15/celery-5.1.2.tar.gz

# 解压
tar zxvf celery-5.1.2.tar.gz

# 进入目录
cd celery-5.1.2

# 构建
python3 setup.py build

# 安装,注意权限,可在前面添加sudo
python3 setup.py install

3. 安装RabbitMQ或Redis

3.1 安装redis

????????本文将以redis作为broker

????????以Ubuntu为例,其他操作系统可参考RabbitMQ官网:Downloading and Installing RabbitMQ — RabbitMQ

????????在Ubuntu系统安装redis可以使用一下命令

sudo apt-get update
sudo apt-get install redis-server

? ? ? ? 启动redis

redis-server

? ? ? ? ?查看redis是否启动

redis-cli

? ? ? ? 上面的命令将打开以下终端

redis 127.0.0.1:6379>

? ? ? ? 其中127.0.0.1是本机ip,6379是redis服务端口号,现在输入ping命令:

redis 127.0.0.1:6379> ping
PONG

? ? ? ? ?以上说明redis已经安装成功。以下是通过源码包安装redis

wget http://download.redis.io/releases/redis-6.0.6.tar.gz
tar xzf redis-6.0.6.tar.gz
cd redis-6.0.6
make

? ? ? ? make命令执行完,在redis-6.0.6/src目录下会出现编译后的Redis服务程序redis-server和启动客户端程序redis-cli

????????如下命令启动Redis,此命令会一直处于占用状态,我们再重新开一个命令行连接

cd redis-6.0.6/src

./redis-server ../redis.conf

?????????注意:如果redis-server 后面指定配置文件,则会以默认的配置启动redis服务。此处我们是使用的指定的默认redis配置文件。也可以根据需要使用自己的配置文件。

? ? ? ? 启动redis服务后,显示如下:

????????

? ? ? ? 以上表示启动成功,可以使用测试客户端程序redis-cli和redis进行交互了,例如:

# 有$ 的一行表示shell命令
$ cd src
$ ./redis-cli
redis> set foo bar
OK
redis> get foo
"bar"

? ? ? ? ?配置celery的BROKER_URL,redis的默认连接URL如下:

BROKER_URL = 'redis://localhost:6379/0'

3.2 安装RabbitMQ

? ? ? ? 这里仍以Ubuntu为例

? ? ? ?Centos7.6系统参考:CentOS7.6 安装RabbitMQ_大帅的博客-CSDN博客

????????首先安装erlang。由于RabbitMQ需要Erlang语言的支持,因此需要先安装Erlang,执行命令:

sudo apt-get install erlang-nox

? ? ? ? ?再安装RabbitMQ

sudo apt-get update
sudo apt-get install rabbitmq-server

? ? ? ? 启动、关闭、重启、状态RabbitMQ服务的命令如下:

# 启动
sudo rabbitmq-server start

# 关闭
sudo rabbitmq-server stop

# 重启
sudo rabbitmq-server restart

# 查看rabbitmq状态
sudo rabbitmqctl status

? ? ? ? 要使用celery,需要创建一个RabbitMQ用户和虚拟主机,并且允许用户访问改虚拟主机。

# 创建rabbitmq的用户名为myuser,密码为mypassword,请自行设置
sudo rabbitmqctl add_user myuser mypassword

# 创建虚拟主机
sudo rabbitmqctl add_vhost myvhost

# 设置权限
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

? ? ? ? RabbitMQ是默认的中间人的URL位置,生产环境根据实际情况修改即可。

BROKER_URL = 'amqp://guest:guest@localhost:5672'

4. 第一个Celery程序

? ? ? ? 我们选redis作为broker,首先要修改一下redis的配置文件redis.conf,修改bind=127.0.0.1为bind=0.0.0.0,意思是允许远程访问Redis数据库。修改完毕需要重启一下redis服务。

# sudo apt-get 方式安装重启
service redis-server restart

# 源码安装重启
src/redis-server ../redis.conf

? ? ? ? 启动成功后检查:

[root@python celery_demo]# ps -elf | grep redis
0 S root      38987 110789  0  80   0 - 28203 pipe_w 06:19 pts/2    00:00:00 grep --color=auto redis
4 S root     104561   2276  0  80   0 - 40606 ep_pol 04:00 pts/0    00:00:23 src/redis-server 127.0.0.1:6379

? ? ? ? 说明已成功启动。

????????现在来编写一个Celery程序

【示例1】(my_first_celery.py)

# encoding=utf-8

from celery import Celery
import time

app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0'
)


@app.task
def add(x, y):
    time.sleep(3)  # 模拟耗时操作
    res = x + y
    print(f"x + y = {res}")
    return res

代码说明

? ? ? ? Celery()的第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称;第二个参数指定了中间人broker,第三个参数指定了后端存储。实现了一个add函数,该函数模拟了耗时操作,等待3秒,传入两个参数并返回之和,使用app.task来装饰该函数。

? ? ? ? 接下来我们启动任务执行单元worker。

celery -A my_first_celery worker -l info

命令说明:

? ? ? ? -A 表示程序段模块名称,worker表示启动一个执行单元,-l是指-level,表示打印的日志等级,可以使用celery -help命令查看celery命令的帮助文档。

? ? ? ? 启动成功后显示如下:

? ? ? ? ?如果不想用celery命令启动worker,则可以直接使用文件驱动,修改my_first_celery.py如下所示:

【实例2】使用入口函数启动my_first_celery.py

?添加了app.start()启动

# encoding=utf-8

from celery import Celery
import time

app = Celery(
    'tasks',
    broker='redis://127.0.0.1:6379/0',
    backend='redis://127.0.0.1:6379/0'
)


@app.task
def add(x, y):
    time.sleep(3)  # 模拟耗时操作
    res = x + y
    print(f"x + y = {res}")
    return res


if __name__ == '__main__':
    app.start()

? ? ? ? 然后再命令中执行python3 my_first_celery.py worker即可,启动后的界面和使用celery命令的结果是一致的。

接下来,编写任务调度程序:start_task.py

from my_first_celery import add  # 导入任务函数add
import time

# delay异步调用,因为add函数里面会等待3秒,这里调用不会阻塞,程序会立即向下执行
result = add.delay(12, 12)

# ready方法检查任务是否执行完毕,此处会循环检查
while not result.ready():
    print(time.strftime("%H:%M:%S"))
    time.sleep(1)

print(result.get())         # 获取任务返回的结果,也就是两个数相加之和
print(result.successful())  # 判断任务是否成功执行

????????执行?python3 start_task.py 得到以下结果:

[root@python celery_demo]# python3 start_task.py
06:48:05
06:48:06
06:48:07
24
True

? ? ? ? 等待了3秒后(有可能会打印4次秒数),任务返回了24,并且成功完成。此时worker界面增加的信息如下:

[2021-09-11 06:48:05,236: INFO/MainProcess] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] received
[2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8] x + y = 24
[2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8]

[2021-09-11 06:48:08,244: INFO/ForkPoolWorker-8] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] succeeded in 3.007353223998507s: 24

? ? ? ? 启动 41425cd6-63c8-41df-bb74-74fd0c5c7438 是 taskid?,只要指定了backend,根据这id就可以随时去backend查找运行结果。使用方法如下:

>>> from my_first_celery import add
>>> taskid='41425cd6-63c8-41df-bb74-74fd0c5c7438'
>>> add.AsyncResult(taskid).get()
24
>>>

? ? ? ? 或者

>>> from celery.result import AsyncResult
>>> AsyncResult(taskid).get()
24

5. 第一个Celery工程项目

? ? ? ? 上面的celery程序非常简单,实际的项目开发应该是模块化的,程序的功能分散在对个主文件中,Celery也不例外。下面扩展第一个Celery程序。

? ? ? ? 新建myCeleryProj目录,并在目录中新建__init__.py、app.py、settings.py、tasks.py文件,其中__init__.py保持为空即可,其作用是把myCeleryProj目录当成一个python包,让python程序导入。

【示例3】第一个工程项目(myCeleryPro)

? ? ? ? setting.py存放配置信息,如下所示:(更多配置项查看官网文档:First Steps with Celery — Celery 5.2.0b3 documentation

# 使用redis 作为消息代理
broker_url = 'redis://127.0.0.1:6379/0'

# 任务结果存在Redis
result_backend = 'redis://127.0.0.1:6379/0'

# 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
result_serializer = 'json'

? ? ? ? app.py是celery worker是入口,如下所示:

# 引用标准库,而不是当前目录下的同名文件。
from __future__ import absolute_import

from celery import Celery

# 初始化app,导入任务
app = Celery("myCeleryProj", include=["myCeleryProj.tasks"])

# 加载配置
app.config_from_object("myCeleryProj.settings")

if __name__ == "__main__":
    app.start()

? ? ? ? tasks.py主要存放具体执行的任务,如下所示:

from myCeleryProj.app import app
import time


@app.task
def add(x, y):
    time.sleep(3)  # 模拟耗时操作
    s = x + y
    print(f"x + y = {s}")
    return s


@app.task
def taskA():
    print("taskA")
    time.sleep(3)


@app.task
def taskB():
    print("taskB")
    time.sleep(3)

? ? ? ? ?在myCeleryProj的同级目录下执行如下命令,运行工程项目。

celery -A myCeleryProj.app worker -c 3 -l info

? ? ? ? -c 3 表示启用三个子进程执行该队列中的任务。运行结果如下:

?????????也可以设置后台运行并指定日志文件

celery -A myCeleryProj.app worker -logfile /tmp/celery -l info -c 3

? ? ? ? ?现在我们已经启动了worker,从运行的打印输出可以看到有三个任务:

[tasks]
  . myCeleryProj.tasks.add
  . myCeleryProj.tasks.taskA
  . myCeleryProj.tasks.taskB

? ? ? ? 接下来手动执行异步调用。

>>> from myCeleryProj.tasks import *
>>> add.delay(5,6);taskA.delay();taskB.delay()
<AsyncResult: b2729c4e-84f9-4247-b3aa-79a71d746e41>
<AsyncResult: 12ac1a74-08b3-40a3-91be-d096de9d5651>
<AsyncResult: 879dbfb7-f216-4e46-b83f-020985b7efec>
>>>

? ? ? ? 这里的add.delay(5,6);taskA.delay();taskB.delay()写在一行是在于同时发出异步执行的命令,worker界面新增的信息如下:

[2021-09-11 10:34:50,423: INFO/MainProcess] Task myCeleryProj.tasks.add[b2729c4e-84f9-4247-b3aa-79a71d746e41] received
[2021-09-11 10:34:50,424: INFO/MainProcess] Task myCeleryProj.tasks.taskA[12ac1a74-08b3-40a3-91be-d096de9d5651] received
[2021-09-11 10:34:50,426: WARNING/ForkPoolWorker-1] taskA
[2021-09-11 10:34:50,426: WARNING/ForkPoolWorker-1]

[2021-09-11 10:34:50,426: INFO/MainProcess] Task myCeleryProj.tasks.taskB[879dbfb7-f216-4e46-b83f-020985b7efec] received
[2021-09-11 10:34:50,428: WARNING/ForkPoolWorker-3] taskB
[2021-09-11 10:34:50,428: WARNING/ForkPoolWorker-3]

[2021-09-11 10:34:53,429: WARNING/ForkPoolWorker-2] x + y = 11
[2021-09-11 10:34:53,430: WARNING/ForkPoolWorker-2]

[2021-09-11 10:34:53,453: INFO/ForkPoolWorker-2] Task myCeleryProj.tasks.add[b2729c4e-84f9-4247-b3aa-79a71d746e41] succeeded in 3.028609608998522s: 11
[2021-09-11 10:34:53,453: INFO/ForkPoolWorker-1] Task myCeleryProj.tasks.taskA[12ac1a74-08b3-40a3-91be-d096de9d5651] succeeded in 3.027193174002605s: None
[2021-09-11 10:34:53,453: INFO/ForkPoolWorker-3] Task myCeleryProj.tasks.taskB[879dbfb7-f216-4e46-b83f-020985b7efec] succeeded in 3.0251767819972883s: None

? ? ? ? 可以看出,worker在10:34:50同时接收到了三个任务,由于并发数是3,且三个任务都执行了time.sleep(3)等待3秒的耗时操作,因此他们都在10:34:53打印了相应的信息并退出。大家可以将并发数设置为1再试验一下运行结果。

? ? ? ? 调用任务task有三种方法:

? ? ? ? (1)apply_async()方法,发送一个task到任务队列,支持更多的参数控制,如:add.apply_async(countdown=10, expires=120)表示执行add函数的时间限制最多为10秒,add函数的有效期为120秒。使用apply_async还支持回调,例如:

add.apply_async((2,2), link=add.s(16))

????????Celery 支持将任务链接在一起,以便一个任务跟随另一个任务。回调任务将应用父任务的结果作为部分参数,这里第一个任务 (4) 的结果将被发送到一个新任务,该任务将前一个结果加 16,形成表达式? (2+2) + 16 = 20

? ? ? ? ?(2)delay()方法,该方法是apply_async的快捷方式,提供便捷的异步调度,但是如果想要更多的参数控制,就必须使用apply_async方法。

? ? ? ? (3)直接调用,相当于普通函数的调用,但是不会再worker上执行。

6. Celery框架

? ? ? ? 前面的知识对Celery程序进行了初探,有了初步了解后,我们再来看看Celery的架构,有助于深入理解Celery。

? ? ? ? ?任务生产者生产任务并将任务发送给你中间人worker,有多个消费者,即执行单元worker持续地监控消息中间人,如有属于自己队列的任务需要执行,就从中间人取出作业名称,查找对应的函数代码并执行。执行完成后将结果存储在Backend。这里的worker可以分不熟部署。彼此之间是独立的。

? ? ? ? 任务调度器Beat:Celery Beat进程会读取配置文件的内容,然后将配置中需要执行的任务发送给中间人。

7. celery路由任务分配队列

? ? ? ? Celery非常容易设置和运行,它通常会使用默认名为Celery的队列(可以通过CELERY_DEFAULT_QUEUE修改)来存放任务。Celery支持同时运行多个队列,还可以使用优先级不同的队列来确保高优先级的任务不需要等待就立即执行。

? ? ? ? 基于前面的示例,我们来实现不同队列来执行不同的任务:任务add在队列default执行,任务taskA在队列tasks_A执行,任务taskB在队列tasks_B执行。

【示例4】将任务自动分配队列

????????首先修改配置文件setting.py

from kombu import Queue

task_queues= (  # 定义任务队列
    Queue("default", routing_key="task.#"),  # 路由键以“task.”开头的消息都进default队列
    Queue("tasks_A", routing_key="A.#"),     # 路由键以“A.”开头的消息都进tasks_A队列
    Queue("tasks_B", routing_key="B.#"),     # 路由键以“B.”开头的消息都进tasks_B队列
)

task_routes = (
    [
        ("myCeleryProj.tasks.add", {"queue": "default"}),    # 将add任务分配至队列 default
        ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),  # 将taskA任务分配至队列 tasks_A
        ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),  # 将taskB任务分配至队列 tasks_B
    ],
)

# 使用redis 作为消息代理
broker_url = 'redis://127.0.0.1:6379/0'

# 任务结果存在Redis
result_backend = 'redis://127.0.0.1:6379/0'

# 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
result_serializer = 'json'

? ? ? ? 一次启动对个队列,执行以下命令:

celery -A myCeleryProj.app worker -Q default,tasks_A,tasks_B -l info

? ? ? ?为了方便演示,我们开了三个终端,分别启动三个队列

celery -A myCeleryProj.app worker -Q default -l info
celery -A myCeleryProj.app worker -Q tasks_A -l info
celery -A myCeleryProj.app worker -Q tasks_B -l info

? ? ? ??然后再开一个终端来调用task

>>> from myCeleryProj.tasks import *
>>> add.delay(1,1);taskA.delay();taskB.delay()
<AsyncResult: 0b0aaef7-c8af-4dc0-8bd3-0224d720f9bc>
<AsyncResult: 5ac76092-55f6-4a0b-bf1f-63cae36832a9>
<AsyncResult: f11e0f76-cf3a-426d-bc29-438b94288da6>

? ? ? ?可以看到三个终端都有一个任务的输出显示

? ? ? ? 任务的路由:前面的代码中觉定任务具体在哪个队列?运行是通过下面的代码去分配的

task_routes = (
    [
        ("myCeleryProj.tasks.add", {"queue": "default"}),    # 将add任务分配至队列 default
        ("myCeleryProj.tasks.taskA", {"queue": "tasks_A"}),  # 将taskA任务分配至队列 tasks_A
        ("myCeleryProj.tasks.taskB", {"queue": "tasks_B"}),  # 将taskB任务分配至队列 tasks_B
    ],
)

? ? ? ? ?实际生产环境可能有对个任务需要路由,可以使用正则的方式批量分配任务到队列中

????????将tasks.py中所有的任务分配到队列default

task_routes = (
    [
        ("myCeleryProj.tasks.*", {"queue": "default"}),
    ],
)

? ? ? ? 将任务taskA和taskB分配到队列task_A中,任务add分配到default中

task_routes = (
    [
        # 将add任务分配至队列 default
        (
            "myCeleryProj.tasks.add",
            {"queue": "default", "routing_key": "task.default"}
        ),
        # 将taskA taskB任务分配至队列 tasks_A
        (
            re.compile(r'myCeleryProj\.tasks\.(taskA|taskB)'),
            {"queue": "tasks_A", "routing_key": "A.#"}),
    ],
)

? ? ? ? 更多有关路由的信息参考:Routing Tasks — Celery 5.2.0b3 documentation

8. Celery Beat自动任务调度

? ? ? ? 前面演示的任务调度都是手动出发的,使用Celery Beat可以自动调度任务。

? ? ? ? Celery Beat是Celery的调度器,可定期启动任务,然后由集群中的可用工作节点worker执行这些任务。默认情况下,Beat进程读取配置文件中的CELERYBEAT_SCHEDULE的设置,也可以自定义存储,比如将启动任务的规则存储在SQL数据库中。请确保每次调度任务可以运行一个调度程序,否则任务将被重复执行。使用集群的方式意味着调度不需要同步,服务可以在不使用锁的情况下运行。

? ? ? ? 先明确一个概念--时区,间隔性的任务调度默认使用UTC时区,也可以通过设置来改变时区:

# 通过配置文件修改
timezone = "Asia/Shanghai"

# 通过app配置修改
app.conf.timezone = "Asia/Shanghai"

? ? ? ? 时区的设置必须加入Celery的app中,默认的调度器(将调度计划存储在celerybeat-schedule文件中)将自动检测时区是否改变,如果时区改变,则自动重置调度计划。其他调度器可能不会自动重置,比如Django数据库调度器就需要手动重置调度计划。

【示例5】Celery调度

? ? ? ? 仍基于myCeleryProj项目,修改setting文件

# 添加模块
from celery.schedules import crontab, timedelta

# 中间内容按照之前不变

# 添加时区和调度器
timezone = 'Asia/Shanghai'
beat_schedule = {
    "add": {
        "task": "myCeleryProj.tasks.add",
        "schedule": timedelta(seconds=5),  # 定义间隔为5s的任务
        "args": (10, 16),
    },
    "taskA": {
        "task": "myCeleryProj.tasks.taskA",
        "schedule": crontab(hour=15, minute=8),  # 定义间隔为对应时区下15:08分执行的任务
    },
    "taskB": {
        "task": "myCeleryProj.tasks.taskB",
        "schedule": crontab(hour=15, minute=8),  # 定义间隔为对应时区下15:08分执行的任务
    },
}

? ? ? ? 打开两个终端,一个执行worker

celery -A myCeleryProj.app worker -Q tasks_A,tasks_B,default -l info

? ? ? ? 执行beat调度

celery -A myCeleryProj.app beat

? ? ? ? work输出结果:

? ? ? ? 更多定时任务调度参考:Periodic Tasks — Celery 5.2.0b3 documentation?

9. Celery 远程调用

? ? ? ? 未完待续。。。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-11 18:53:24  更:2021-09-11 18:54:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 13:51:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码