Celery介绍和使用
一.Celery介绍:
- 一个简单、灵活且可靠、处理大量消息的分布式系统,可以在一台或者多台机器上运行。 Celery是一个功能完备即插即用的任务队列
- 单个 Celery 进程每分钟可处理数以百万计的任务。通过消息进行通信,使用消息队列(broker)在客户端和消费者之间进行协调。
安装Celery:
$ pip install -U Celery
Celery官方文档:https://docs.celeryq.dev/en/latest/index.html
1. 生产者消费者设计模式
最常用的解耦方式之一,寻找中间人(broker)搭桥,保证两个业务没有直接关联。 我们称这一解耦方式为:生产者消费者设计模式 总结:
- 生产者生成消息,缓存到消息队列中,消费者读取消息队列中的消息并执行。
- 由美多商城生成发送短信消息,缓存到消息队列中,消费者读取消息队列中的发送短信消息并执行。
2.中间人broker
示例:此处演示Redis数据库作为中间人broker Celery需要一种解决消息的发送和接受的方式,我们把这种用来存储消息的的中间装置叫做message broker, 也可叫做消息中间人。 作为中间人,我们有几种方案可选择:
1.RabbitMQ
如果使用的是Ubuntu或者Debian发行版的Linux,可以直接通过命令安装RabbitMQ:
sudo apt-get install rabbitmq-server
安装完毕之后,RabbitMQ-server服务器就已经在后台运行。
如果用的并不是Ubuntu或Debian, 可以在以下网址: http://www.rabbitmq.com/download.html 去查找自己所需要的版本软件。
2.Redis
Redis也是一款功能完备的broker可选项,但是其更可能因意外中断或者电源故障导致数据丢失的情况。 关于是由那个Redis作为Broker,可访下面网址: http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis
3.celery框架伪代码
class Broker(object):
broker_list = []
class Worker(object):
def run(self, broker, func):
if func in broker.broker_list:
func()
else:
return 'error'
class Celery(object):
def __init__(self):
self.broker = Broker()
self.worker = Worker()
def add(self, func):
self.broker.broker_list.append(func)
def work(self, func):
self.worker.run(self.broker,func)
def send_sms_code():
print('send_sms_code')
app=Celery()
app.add(send_sms_code)
app.work(send_sms_code)
2.使用
1)创建Celery实例并加载配置 1.定义Celery包 2.创建Celery实例 mian文件代码celery_tasks.main.py
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'meiduo_mall.settings')
from celery import Celery
app=Celery('celery_tasks')
app.config_from_object('celery_tasks.config')
app.autodiscover_tasks(['celery_tasks.sms','celery_tasks.email'])
3.加载Celery配置 代码config.py
broker_url = "redis://127.0.0.1/15"
2)定义发送短信任务
1.注册任务:celery_tasks.main.py
app.autodiscover_tasks(['celery_tasks.sms','celery_tasks.email'])
2.定义任务:celery_tasks.sms.tasks.py task.py 代码
from libs.yuntongxun.sms import CCP
from celery_tasks.main import app
@app.task
def celery_send_sms_code(mobie,code):
CCP().send_template_sms(mobie,[code,5],1)
示例demo
from celery_tasks.main import celery_app
from libs.yuntongxun.sms import CCP
import logging
logger = logging.getLogger('django')
@celery_app.task(name='send_sms_code')
def send_sms_code(mobile, sms_code):
"""
发送短信异步任务
:param mobile: 手机号
:param sms_code: 短信验证码
"""
try:
send_ret = CCP().send_template_sms(mobile, [sms_code, 5], 1)
except Exception as e:
logger.error(e)
4)启动Celery服务 进入虚拟环境,输入命令:$ celery -A celery_tasks.main worker -l INFO
- -A指对应的应用程序, 其参数是项目中 Celery实例的位置。
- worker指这里要启动的worker。
- -l指日志等级,比如info等级。
5)调用发送短信任务
发送短信业务逻辑代码
from celery_tasks.sms.tasks import celery_send_sms_code
celery_send_sms_code.delay(mobile,sms_code)
示例demo
send_sms_code.delay(mobile, sms_code)
6)补充celery worker的工作模式
- 默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
- 如何自己指定进程数: celery worker -A proj --concurrency=4
- 如何改变进程池方式为协程方式: celery worker -A proj --concurrency=1000 -P eventlet
-c 1000
安装eventlet模块: $ pip install eventlet
启用 Eventlet 池: $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000
|