IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> fastapi后台任务,接口先返回结果,再执行任务 -> 正文阅读

[Python知识库]fastapi后台任务,接口先返回结果,再执行任务

直接上代码

from exec import *
from fastapi import APIRouter, BackgroundTasks
from pydantic import BaseModel, Field
import asyncio
import time

class ConfigModel(BaseModel):
    control_vm_conf: dict = Field(title="master配置信息", default={})
    slaver_vm_conf: dict = Field(title="执行机配置信息", default={})


# 更新config配置的接口
@Router.post("/UpdateConfig", tags=["UpdateConfig"])
async def update_config(config_data: ConfigModel, backgroundTasks: BackgroundTasks):
    if config_data.control_vm_conf:
        print("更新config配置,标记为master")
        backgroundTasks.add_task(update_vm_config, "master", config_data.control_vm_conf)
    elif config_data.slaver_vm_conf:
        print("更新config配置,标记为slaver")
        backgroundTasks.add_task(update_vm_config, "slaver", config_data.slaver_vm_conf)
    else:
        print("缺少参数")
    return {"message": "config任务已提交"}

async def update_vm_config(vm_type: str, new_config: dict):
    """更新config配置"""
    key_list = ["control_mgmt_ip", "control_mgmt_netmask", "control_mgmt_gateway"]
    new_config_copy = new_config.copy()
    print(new_config)
    if vm_type == "master":
        # 检查传到参数 是否有数组中的字段
        for k in new_config_copy.keys():
            if not key_list.__contains__(k):
                if k == "control_mgmt_netmask":
                    new_config["control_mgmt_netmask"] = "255.254.0.0"
                elif k == "control_mgmt_gateway":
                    new_config["control_mgmt_gateway"] = "10.172.55.53"
                else:
                    print("config缺少{}字段".format(k))
    else:
        for k in new_config_copy.keys():
            if key_list.__contains__(k):
                new_config.pop(k)
                print("非master的config配置不允许有{}字段,已删除!".format(k))

    # 全量替换
    with open(r"C:\test\bin\config.json", "w") as f:
        f.write(json.dumps(new_config))
    # 停监听
    Config.take_effect_config(vm_type)
    # 配ip
    if vm_type != "master":
        asyncio.create_task(Config.delete_mgmt_ip())

结果为:
1、接口先返回字典 {“message”: “config任务已提交”} 消息,状态为200
2、后台继续执行update_vm_config函数,直到完成

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-03-21 20:45:38  更:2022-03-21 20:46:28 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 19:37:56-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码