一、celery包结构
celery编写为包结构后,可以随意拔插到任意项目中使用,也是celery官方推荐写法。
第一步: 创建一个celery包
第二步: 创建celery.py(固定名字不能自定义)
from celery import Celery
backend = 'redis://127.0.0.1:6379/1'
broker = 'redis://127.0.0.1:6379/2'
app = Celery('main', backend=backend, broker=broker,
include=['celery_task.seckill_task',
'celery_task.get_result'])
第三步: 创建celery任务(延迟任务、异步任务、定时任务) seckill_task.py 秒杀任务 编写秒杀任务时如果有用到项目中的某些模块、代码需要将该项目导入到环境变量
import os,django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
django.setup()
from celery_task.celery import app
from django_redis import get_redis_connection
@app.task
def seckill(shop_id, user_id):
redis = get_redis_connection()
name = 'seckill_table_' + shop_id
if redis.llen(name) <= 100:
redis.lpush(name, shop_id, user_id)
return True
else:
return False
get_result.py 获取celery任务结果
from celery_task.celery import app
from celery.result import AsyncResult
def result(id):
asy = AsyncResult(id=id, app=app)
if asy.successful():
result = asy.get()
return True, '秒杀成功'
elif asy.failed():
return False, '秒杀失败'
elif asy.status == 'PENDING':
return True, '秒杀等待中被执行'
elif asy.status == 'RETRY':
return True, '秒杀异常后正在重试'
elif asy.status == 'STARTED':
return True, '秒杀已经开始被执行'
第四步: 在需要启动celery任务的地方启动即可
def post(self, request):
from celery_task.seckill_task import seckill
user_id = request.data.get('user_id')
res=seckill.delay('1001',user_id)
return Luffy_Response(200, msg='正在秒杀中',task_id=str(res))
def get(self, request):
from celery_task.get_result import result
task_id = request.query_params.get('task_id')
key, asy = result(id=task_id)
if key:
return Luffy_Response(200, msg=asy, key=201)
return Luffy_Response(1001, msg=asy, key=1001)
第五步: 编写完celery和celery任务后,启动worker和beat beat(发布celery定时任务)
celery -A celery包名 beat -l info
worker(负责进行celery任务执行)
celery -A celery包名 worker -l info -P eventlet
二、celery任务
1.异步任务
使用形式:
任务.dealy(参数)
2.延迟任务
使用时间对象来设置
任务.apply_async(args=[参数],eta=时间对象)
使用时间来设置
任务.apply_async(args=[参数],countdown=10)
参数retry:如果任务失败后,是否重试,默认为True
3.定时任务
在配置文件中配置定时任务
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'send_sms': {
'task': 'celery_task.user_task.send_sms',
'schedule': timedelta(seconds=3),
'args': ('1895367221', '8888'),
}
}
三、双写一致性
一个软件有部分数据是被用户大量访问的,例如首页、推荐等 针对于这些接口,如果每次都要从数据库中获取会影响软件运行效率,这个时候可以将经常被访问的数据存放在缓存的,这个时候可以使用redis来进行。
但是使用了redis虽然缓解了访问的压力,但是数据库中的数据一但变动了,缓存中的数据还是以前的,就会导致数据的不一致。这个时候就需要用到双写一致性的原理。
双写一致性方案: 1.读请求写请求串行化执行 2.先更新数据库,再更新缓存 3.先删除缓存,再更新数据库 4.旁路缓存模式(Cache Aside Pattern)
此处以第二种方案示例: 此处以轮播图定时缓存更新为例
1.生成定时任务来更新缓存 banner_task.py
import os,django
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')
django.setup()
from home.models import Banner
from home.serializer import BannerSerializer
from django.core.cache import cache
def now_update_banner():
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
res = BannerSerializer(data=queryset, many=True)
cache.set('banner_list', res.data)
配置文件
from datetime import timedelta
from celery.schedules import crontab
from celery_task.celery import app
app.conf.beat_schedule = {
'now_update_banner': {
'task': 'celery_task.banner_task',
'schedule': timedelta(seconds=50),
}
}
视图类
class BannerView(GenericViewSet, ListModelMixin):
queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
serializer_class = BannerSerializer
def list(self, request, *args, **kwargs):
data = cache.get('banner_list')
if data:
logger.info('查询了缓存')
return Luffy_Response(200, '查询轮播图数据成功', data=data)
else:
res = super().list(request, *args, **kwargs)
logger.info('查询了mysql数据库')
cache.set('banner_list', res.data)
return Luffy_Response(200, '查询轮播图数据成功', data=res.data)
2.启动项目打开worker和beat
celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
|