IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> celery包结构、celery任务、双写一致性 -> 正文阅读

[Python知识库]celery包结构、celery任务、双写一致性


一、celery包结构

celery编写为包结构后,可以随意拔插到任意项目中使用,也是celery官方推荐写法。

第一步:
创建一个celery包

第二步:
创建celery.py(固定名字不能自定义)

from celery import Celery
backend = 'redis://127.0.0.1:6379/1'  # 结果存储
broker = 'redis://127.0.0.1:6379/2'  # 消息中间件
app = Celery('main', backend=backend, broker=broker,
             include=['celery_task.seckill_task', # celery任务
                      'celery_task.get_result']) # 获取celery任务结果
                      #include中加入需要执行的celery任务

第三步:
创建celery任务(延迟任务、异步任务、定时任务)
seckill_task.py 秒杀任务
编写秒杀任务时如果有用到项目中的某些模块、代码需要将该项目导入到环境变量

import os,django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')# project_name 项目名称
django.setup()
from celery_task.celery import app
from django_redis import get_redis_connection

@app.task
def seckill(shop_id, user_id):
    redis = get_redis_connection()
    name = 'seckill_table_' + shop_id
    if redis.llen(name) <= 100:
        redis.lpush(name, shop_id, user_id)
        return True
    else:
        return False

get_result.py 获取celery任务结果

from celery_task.celery import app
from celery.result import AsyncResult

def result(id):
    asy = AsyncResult(id=id, app=app)
    if asy.successful():
        result = asy.get()
        return True, '秒杀成功'
    elif asy.failed():
        return False, '秒杀失败'
    elif asy.status == 'PENDING':
        return True, '秒杀等待中被执行'
    elif asy.status == 'RETRY':
        return True, '秒杀异常后正在重试'
    elif asy.status == 'STARTED':
        return True, '秒杀已经开始被执行'

第四步:
在需要启动celery任务的地方启动即可

    def post(self, request):
        from celery_task.seckill_task import seckill
        user_id = request.data.get('user_id')
        res=seckill.delay('1001',user_id) # 异步任务使用.delay来加入消息队列
        return Luffy_Response(200, msg='正在秒杀中',task_id=str(res))

    def get(self, request):
        from celery_task.get_result import result
        task_id = request.query_params.get('task_id')
        key, asy = result(id=task_id) # 此处为普通任务
        if key:
            return Luffy_Response(200, msg=asy, key=201)
        return Luffy_Response(1001, msg=asy, key=1001)

第五步:
编写完celery和celery任务后,启动worker和beat
beat(发布celery定时任务)

celery -A celery包名 beat -l info

worker(负责进行celery任务执行)

celery -A celery包名 worker -l info -P eventlet

二、celery任务

1.异步任务

使用形式:

任务.dealy(参数)

2.延迟任务

使用时间对象来设置
任务.apply_async(args=[参数],eta=时间对象)

使用时间来设置
任务.apply_async(args=[参数],countdown=10)

	参数retry:如果任务失败后,是否重试,默认为True

3.定时任务

在配置文件中配置定时任务

from datetime import timedelta
        from celery.schedules import crontab
        app.conf.beat_schedule = {
            'send_sms': { #定时任务名字
                'task': 'celery_task.user_task.send_sms', # 配置定时任务执行内容
                'schedule': timedelta(seconds=3), # 三秒后
                # 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
                'args': ('1895367221', '8888'), # 定时任务所需要参数
            }
        }

三、双写一致性

一个软件有部分数据是被用户大量访问的,例如首页、推荐等
针对于这些接口,如果每次都要从数据库中获取会影响软件运行效率,这个时候可以将经常被访问的数据存放在缓存的,这个时候可以使用redis来进行。

但是使用了redis虽然缓解了访问的压力,但是数据库中的数据一但变动了,缓存中的数据还是以前的,就会导致数据的不一致。这个时候就需要用到双写一致性的原理。

双写一致性方案:
1.读请求写请求串行化执行
2.先更新数据库,再更新缓存
3.先删除缓存,再更新数据库
4.旁路缓存模式(Cache Aside Pattern)

此处以第二种方案示例:
此处以轮播图定时缓存更新为例

1.生成定时任务来更新缓存
banner_task.py

import os,django

from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'luffyapi.settings.dev')# project_name 项目名称
django.setup()
from home.models import Banner
from home.serializer import BannerSerializer
from django.core.cache import cache

def now_update_banner():
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    res = BannerSerializer(data=queryset, many=True)
    cache.set('banner_list', res.data)


配置文件

# 配置celery定时任务
from datetime import timedelta
from celery.schedules import crontab
from celery_task.celery import app
app.conf.beat_schedule = {
    'now_update_banner': {
        'task': 'celery_task.banner_task',
        'schedule': timedelta(seconds=50),
    }
}

视图类

class BannerView(GenericViewSet, ListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        data = cache.get('banner_list')
        if data:
            logger.info('查询了缓存')
            return Luffy_Response(200, '查询轮播图数据成功', data=data)
        else:
            res = super().list(request, *args, **kwargs)
            logger.info('查询了mysql数据库')
            cache.set('banner_list', res.data)
            return Luffy_Response(200, '查询轮播图数据成功', data=res.data)

2.启动项目打开worker和beat

celery -A celery_task beat -l info
celery -A celery_task worker -l info -P eventlet
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-07-21 21:30:58  更:2022-07-21 21:31:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 11:47:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码