IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Flask 常用组件 -> 正文阅读

[Python知识库]Flask 常用组件

作者:token comment

Flask常用组件

一、 flask_session

session 是基于cookie实现, 保存在服务端的键值对(形式为 {随机字符串:'uuid'}), 同时在浏览器中的cookie中也对应一相同的随机字符串,用来再次请求的 时候验证,这个组件的作用是将session数据存储到数据库中

1、 常用配置

# 对 session 进行保护
SECRET_KEY = os.urandom(32)  # 创建密钥
SESSION_USE_SIGNER = True  # 是否签名保护

PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)  # 设置时间延迟
SESSION_REFRESH_EACH_REQUEST = True  # 每刷新一次就更新一次session

SESSION_TYPE = "redis"  # 设置redis存储session数据
SESSION_REDIS = Redis("127.0.0.1")  # 连接数据库
SESSION_PREFIX = "MyWebPrefix:"  # 设置session前缀,默认为session:

注意:session中存储的是字典,当修改字典内部(第二级)元素时,会造成数据不更新

解决方法:

我们要先阅读一下部分源码

class SecureCookieSession(CallbackDict, SessionMixin):
    #: When data is changed, this is set to ``True``. Only the session
    modified = False  # 只有当session里面的数据改变的时候,这个才会被设置成True

    #: When data is read or written, this is set to ``True``. Used by
    accessed = False

    def __init__(self, initial: t.Any = None) -> None:
        def on_update(self) -> None:
            self.modified = True  # 如果数据更新,就会被改为True,后面就会对session设置到cookies中
            """
			if not (session.modified  # 如果session修改,就会重新设置到cookies中
			or 
			(
            session.permanent  # 是否设置到cookie里面
            and 
            app.config["SESSION_REFRESH_EACH_REQUEST"]  # 如果为True,则每次请求刷新一次
        	)):
            return
            ...
            response.set_cookie(...)
            """
            self.accessed = True  # 如果session已读,则将session写入response中
            """
            if session.accessed:
            	response.vary.add("Cookie")
            """
		# 将on_update()传递给CallbackDict
        super().__init__(initial, on_update)

解决方法:

  1. 方法一

    session.modified = True  # 手动使得session标记为session的内容被修改
    
  2. 方法二

    app.config["SESSION_REFRESH_EACH_REQUEST"] = True  # 每刷新一次就更新
    # 同时,在登录成功之后,设置
    session.permanent = True  # 默认为 False,但是如果使用redis的话,就不需要设置,其默认为True
    

2、 使用方法

2.1 session_interface

通过session_interface来设置

from flask_session import RedisSessionInterface
from flask import Flask
from os import urandom
from redis import Redis

app = Flask(__name__)
app.serect_key = urandom(32)  # 设置32位随机密钥
app.config["SESSION_USE_SIGNER"] = True  
# 通过redis保存session
app.session_interface = RedisSessionInterface(
    redis=Redis("127.0.0.1"),  # 连接Redis数据库
    key_prefix="flask_login",  # 设置随机字符串的前缀,默认为session:
)

默认session设置为

from flask.sessions import SecureCookieSessionInterface
app.session_interface = SecureCookieSessionInterface()

修改后,就可以直接使用了,其使用方法和原先的一样

2.2 config

通过配置文件来设置

from flask_session import Session
from flask import Flask
from os import urandom
from redis import Redis

settings = {
    "SESSION_TYPE": "redis",  # 使用redis数据库连接
    "SESSION_REDIS": Redis("127.0.0.1"),  # 连接redis数据库
    "SECRET_KEY": os.urandom(32),  # 创建密钥
    "SESSION_USE_SIGNER": True,  # 是否签名保护
    "SESSION_REFRESH_EACH_REQUEST"True# 每刷新一次就更新数据
}
app.config.from_mapping(settings)  # 设置配置
Session(app)  # 内部封装了设置app.session_interface的方法

二、 DBUtils

1、 引言

当我们要对数据库进行操作时,可以这么干:

import pymysql
from functools import wraps
SQL_CONFIG = {
    # 对数据库的配置
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "qwe123",
    "db": "flask1",
}

def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""
    @wraps(fun)
    def inner(sql, *args):
        conn = pymysql.connect(**SQL_CONFIG)  # 连接数据库
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 返回字典类型的SQL数据
        obj = fun(sql, cursor, *args)  # 将参数传入函数中,进行运行
        conn.commit()  # 提交事务
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj  # 返回数据

    return inner


class SQLTool:
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

if __name__ == '__main__':
    obj = SQLTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
    print(obj)

问题来了,如果有很多连接的话,开启数据库再关闭是不是很麻烦呢?

  • 我们可以使用数据库连接池:DBUtils

2、 DBUtils

使用数据库连接池

此连接池有两种模式:

  1. 为每一个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新发到连接池,供自己线程再次使用。当线程终止,连接自动关闭
  2. 创建一批连接到连接池,供所有线程共享使用(主要)

2.1 模式一

POOL = PersistentDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    closeable=False,
    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
    threadlocal=None,  # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)

def func():
    conn = POOL.connection(shareable=False)
    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    cursor.close()
    conn.close()

func()

2.2 模式二

import time
import pymysql
import threading
from dbutils.pooled_db import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='pooldb',
    charset='utf8'
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '\r\n')

    cursor = conn.cursor()
    cursor.execute('select * from tb1')
    result = cursor.fetchall()
    conn.close()

func()

3、 代码封装

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# 使用单例模式创建一个数据库连接池
from dbutils.pooled_db import PooledDB
import pymysql
SQL_CONFIG = {
    # 对数据库的配置
    "host": "127.0.0.1",
    "port": 3306,
    "user": "root",
    "passwd": "qwe123",
    "db": "flask1",
    "charset": "utf8",
}
POOL_CONFIG = {
    #  对数据库连接池的配置
    "maxconnections": 6,
    "mincached": 2,
    "maxcached": 5,
    "maxshared": 3,
    "blocking": True,
    "maxusage": None,
    "setsession": [],
    "ping": 0,
}

POOL = PooledDB(
    creator=pymysql,
    **SQL_CONFIG,
    **POOL_CONFIG
)

def select_sql(type_):
    def sql_outer(fun):
        """使用装饰器,可以对数据库便捷操作"""

        @wraps(fun)
        def inner(sql, *args):
            if type_ == "sql":
                """如果通过数据库来获取值的话"""
                conn = pymysql.connect(**SQL_CONFIG)  # 连接数据库
                cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)  # 返回字典类型的SQL数据
                obj = fun(sql, cursor, *args)  # 将参数传入函数中,进行运行
                conn.commit()  # 提交事务
                cursor.close()  # 关闭游标
                conn.close()  # 关闭连接
                return obj  # 返回数据
            elif type_ == "pool":
                # 通过数据库连接池取值
                conn = POOL.connection()
                cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
                obj = fun(sql, cursor, *args)
                conn.commit()
                cursor.close()  # 关闭游标
                conn.close()  # 关闭连接
                return obj
            raise ValueError("type_ value error, value = pool or sql")
        return inner

    return sql_outer


class SQLTool:
    """使用pymysql单线程连接"""

    @staticmethod
    @select_sql("sql")
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @select_sql("sql")
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj


class DBUtilsTool:
    """使用数据库连接池"""

    @staticmethod
    @select_sql("pool")
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @select_sql("pool")
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

obj = DBUtilsTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)

4、 结合flask使用

4.1 方式一

无法创建的原因是flask的上下文管理机制,当flask还没run()的时候,无法访问current_app里面的配置信息

将配置信息放到settings配置文件中,再使用init_app方法——模拟Session(app),把数据库连接池放到配置文件config里面

在存放数据库连接池的文件sql.py中:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app
from dbutils.pooled_db import PooledDB


def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""

    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        POOL = current_app.config["SQL_POOL"]  # 将其存储到配置文件中,防止连接池不存在
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj

    return inner


class DBUtilsTool:
    """使用数据库连接池,模拟Session(app)"""
    def __init__(self, app=None):
        self.app = app
        if app:
            self.init_app(app)

    def init_app(self, app):
        POOL = PooledDB(
            **app.config["POOL_CONFIG"]
        )
        app.config["SQL_POOL"] = POOL

    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

settings.py文件中:

import pymysql

class BaseConfig:
    POOL_CONFIG = {
        #  对数据库连接池的配置
        "creator": pymysql,
        "maxconnections": 6,
        "mincached": 2,
        "maxcached": 5,
        "maxshared": 3,
        "blocking": True,
        "maxusage": None,
        "setsession": [],
        "ping": 4,
        # 对数据库的配置
        "host": "127.0.0.1",
        "port": 3306,
        "user": "root",
        "passwd": "qwe123",
        "db": "flask1",
        "charset": "utf8",
    }

manage.py主运行程序中

from sql import DBUtilsTool
from flask import Flask

def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")
    # 模拟session_redis 将app传入,初始化数据库连接池
    DBUtilsTool(app)  # 在程序刚启动的时候,flask还没开始运行,就把配置文件放到其中,flask开启的时候,就已经将数据库连接池存储到了config中
    return app


if __name__ == '__main__':
    app = create_app()
    app.run("0.0.0.0")

4.2 方式二

直接将数据库连接池添加到配置文件中

sql.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
from flask import current_app


def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""

    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        POOL = current_app.config["SQL_POOL"]  # 将其存储到配置文件中,防止连接池不存在
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj

    return inner


class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

settings.py

from dbutils.pooled_db import PooledDB

class BaseConfig:
    SQL_POOL = PooledDB(
                #  对数据库连接池的配置
                "creator": pymysql,
                "maxconnections": 6,
                "mincached": 2,
                "maxcached": 5,
                "maxshared": 3,
                "blocking": True,
                "maxusage": None,
                "setsession": [],
                "ping": 4,
                # 对数据库的配置
                "host": "127.0.0.1",
                "port": 3306,
                "user": "root",
                "passwd": "qwe123",
                "db": "flask1",
                "charset": "utf8",
            )

manage.py

from sql import DBUtilsTool
from flask import Flask

def create_app():
    app = Flask(__name__)
    app.config.from_object("settings.BaseConfig")
    return app


if __name__ == '__main__':
    app = create_app()
    app.run("0.0.0.0")

不建议使用方法二,其没有做到配置与程序分离的效果

4.3 方法三

使用pool.py在里面创建POOL

from flask import current_app


POOL = PooledDB(
                **current_app.config["POOL_CONFIG"]
            )

sql.py

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# @author: A.L.Kun
# @file : sql.py
# @time : 2022/6/5 0:29
import pymysql
from functools import wraps
# from .pool import POOL  # 注意不能在这里导入,因为app创建时,注册蓝图的时候,app并没有运行,故current_app里面没有配置信息

def sql_outer(fun):
    """使用装饰器,可以对数据库便捷操作"""

    @wraps(fun)
    def inner(sql, *args):
        # 通过数据库连接池取值
        from .pool import POOL  # 但路由中调用的时候导入,app肯定运行起来了,故current_app里面肯定有配置信息
        conn = POOL.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        obj = fun(sql, cursor, *args)
        conn.commit()
        cursor.close()  # 关闭游标
        conn.close()  # 关闭连接
        return obj

    return inner


class DBUtilsTool:
    """使用数据库连接池"""
    @staticmethod
    @sql_outer
    def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        # print(*args)
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入
        obj = cursor.fetchall()  # 获取全部数据
        return obj

    @staticmethod
    @sql_outer
    def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
        cursor.execute(sql, *args)  # 注意不能使用字符串的拼接,防止SQL注入,同时要对元组进行解包
        obj = cursor.fetchone()  # 获取第一条数据
        return obj

要注意current_app的创建时期,其他的使用方法都一样

只要写原生SQL,就要使用数据库连接池

三、 wtforms

作用:专门用于对python web框架做表单验证

1、 支持的字段

字段描述
StringField文本字段
TextAreaField多行文本字段
PasswordField密码文本字段
HiddenField隐藏文本字段
DateField文本字段,值为 datetime.date 格式
DateTimeField文本字段,值为 datetime.datetime 格式
IntegerField文本字段,值为整数
DecimalField文本字段,值为 decimal.Decimal
FloatField文本字段,值为浮点数
BooleanField复选框,值为 True 和 False
RadioField一组单选框
SelectField下拉列表
SelectMultipleField下拉列表,可选择多个值
FileField文件上传字段
SubmitField表单提交按钮
FormField把表单作为字段嵌入另一个表单
FieldList一组指定类型的字段

2、 字段参数

参数描述
label字段别名,在页面中可以通过字段.label展示
validator验证规则列表
filters过氯器列表,用于对提交数据进行过滤
description描述信息,通常用于生成帮助信息
id表示在form类定义时候字段的位置,通常你不需要定义它,默认会按照定义的先后顺序排序
default默认值
widgethtml插件,通过该插件可以覆盖默认的插件,更多通过用户自定义
render_kw自定义html属性
choices复选类型的选项

3、 验证函数

验证函数说明
Email验证是电子邮件地址
EqualTo比较两个字段的值; 常用于要求输入两次密钥进行确认的情况
IPAddress验证IPv4网络地址
Length验证输入字符串的长度
NumberRange验证输入的值在数字范围内
Optional无输入值时跳过其它验证函数
DataRequired确保字段中有数据
Regexp使用正则表达式验证输入值
URL验证url
AnyOf确保输入值在可选值列表中
NoneOf确保输入值不在可选列表中

可以根据验证函数的源码,来自定义验证函数

4、 代码实例

创建一个上传文件功能的app,里面包含了对其工作流程的关键步骤分析,希望对理解有用

代码实例:

app.py

from flask import Flask
from wtforms import FileField, Form, SubmitField, widgets, validators
from flask import (request, render_template, current_app)
from werkzeug.utils import secure_filename
import os


class UploadForm(Form):
    """创建上传文件的字段"""
    # UploadForm.checkbox = UnboundField(BooleanField, *args, **kwargs, creation_counter=1)
    file1 = FileField(
        render_kw={"class": "file"},
        widget=widgets.FileInput(),
        validators=[
            validators.DataRequired(message="请选择要上传的文件哦!")
        ]
    )
    #  UploadForm.sbtn = UnboundField(SubmitField, *args, **kwargs, creation_counter=2)
    sbtn = SubmitField(
        render_kw={
            "id": "upload_submit",
            "value": "上传文件",
        },
        widget=widgets.SubmitInput(),
    )


config = {
    "UPLOAD_FOLDER": "upload/",  # 设置存储文件的文件夹
}
app = Flask(__name__)
app.config.from_mapping(config)


@app.route("/upload", methods=['GET', 'POST'])
def upload():
    # 用于专门处理提交的数据的路由
    if request.method == "POST":
        form = UploadForm(formdata=request.files)  # 上传文件
        if form.validate():  # 对数据进行验证
            f = form.file1.data
            f.save(os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(f.filename)))  # 将文件名称安全处理
            return render_template("index.html", form=form, msg=f"上传文件成功!上传的文件为{f.filename}")
        else:
            return render_template("index.html", form=form, msg="请选择文件哦!")
    form = UploadForm()
    """
    # 对该类创建时的源码分析
    metaclass=FormMeta  # 其为元类
    # 故,创建对象时
    1、 FormMeta.__call__
        # 在 __call__ 方法里面进行的步骤
        UploadForm._unbound_fields = None
        UploadForm._wtforms_meta = None
            _unbound_fields = [  # 其根据counter来排序
                ("checkbox": UnboundField(BooleanField, *args, **kwargs, creation_counter=1))
                ("sbtn": UnboundField(SubmitField, *args, **kwargs, creation_counter=1))
                ]
            _wtforms_meta =  type("Meta", tuple([DefaultMeta]), {})  # DefaultMeta = Form.Meta
                          =  class Meta(DefaultMeta): 
                                    pass
                                    
    2、 UploadForm.__new__
        # 在 __new__ 方法里面进行的步骤,然后发现没有 __new__ 方法,除非自定义
        pass
        
    3、 UploadForm.__init__
        # 执行 __init__ 方法
        UploadForm()._fields = {
                "file1": FileField(...),
                "sbtn": SubmitField(...)
            }
            UploadForm().name=FileField(...)
            UploadForm().sbtn=SubmitField(...)
    """
    print(form.file1)
    """
    # 访问类的属性时的源码分析
    # 故,form.name 执行的是字段类中的 __str__ 方法
    Field.__str__ -> return self()  # 执行字段的 __call__ 方法
    Field.__call__ -> return self.meta.render_field(self, kwargs)  
    DefaultMeta.render_field(self, kwargs)  -> return field.widget(field, **render_kw)    # 执行 widget 的 __call__ 方法
    Input.__call__ -> return Markup("<input %s>" % self.html_params(name=field.name, **kwargs))  # 进行渲染
    """
    for i in form:
        print(i)  # 发现i是可以遍历的,
        """
        # 其内部有一个iter方法
        BaseField.__iter__ -> iter(self._fields.values())
        print(i)  # 执行字段内部的 __str__ 方法
        """
    return render_template("index.html", form=form)


if __name__ == '__main__':
    app.run()

还有验证流程,可以自己尝试一下

templates/index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>文件上传</title>
</head>
<body>
    <div class="uploadFile">
        <h1>请选择要上传的文件</h1>
        <form method="POST" enctype="multipart/form-data">
            <p>{{ form.file1 }}</p>
            <p style="color: red">{{ form.file1.errors[0] }}</p>
            <p>{{ form.sbtn }}</p>
        </form>
        <p style="color: green">{{ msg }}</p>
    </div>
</body>
</html>

四、 flask_mail

1、 简介

在web程序中,经常会需要发送电子邮件。比如,在用户注册账户时发送确认邮件;定期向用户发送热门内容或是促销信息等等。在Web程序中发送电子邮件并不复杂,借助扩展Flask-Mail或是第三方邮件服务,只需要几行代码就可以发送邮件

配置信息:

参数描述
MAIL_SERVER邮件服务器的名称/IP地址
MAIL_PORT所用服务器的端口号
MAIL_USE_TLS启用/禁用传输安全层加密
MAIL_USE_SSL启用/禁用安全套接字层加密
MAIL_DEBUG调试支持,默认是Flask应用程序的调试状态
MAIL_USERNAME发件人的用户名
MAIL_PASSWORD发件人的密码
MAIL_DEFAULT_SENDER设置默认发件人
MAIL_MAX_EMAILS设置要发送的最大邮件
MAIL_SUPPRESS_SEND如果app.testing设置为true,则发送被抑制
MAIL_ASCII_ATTACHMENTS如果设置为true,则将附加的文件名转换为ASCII

2、 Mail

它管理电子邮件消息的要求。 类构造函数采用以下形式

方法描述
send()发送Message类对象的内容
connect()与邮件主机打开连接
send_message()发送消息对象

3、 Massage

3.1 实例化对象

Message(
    subject='',  # 设置标题
    recipients=[],  # 收件人
    body=None,  # 发内容
    html=None,
    sender=None,  # 发件人
    cc=None,
    bcc=None,
    attachments=None,  # 附件
    reply_to=None,  
    date=None,
    charset=None,
    extra_headers=None,
    mail_options=None,
    rcpt_options=None
)

3.2 类方法

attach() - 向消息添加附件。 该方法采用以下参数:

  • filename - 要附加的文件的名称
  • content_type - 文件的MIME类型
  • data - 原始文件数据
  • disposition - 内容处置,如果有的话

add_recipient() - 向消息添加另一个收件人

4、 使用方法

  1. 初始化邮箱

    from flask_mail import Mail, Message
    mail = Mail(app)
    
  2. 配置邮箱信息

    class BaseConfig2():
        MAIL_SERVER = "smtp.qq.com"  # 设置SMTP服务器
        MAIL_PORT = 465  # 设置端口
        MAIL_USE_TLS = False  # 是否使用TLSSL加密
        MAIL_USER_SSL = True  # 是否使用SSL加密
        MAIL_USERNAME = "liu.zhong.kun@oxmail.com"  # 邮箱
        MAIL_PASSWORD = "xtisaddfdfntdcjf"  # 密码
        MAIL_DEFAULT_SENDER = "A.L.Kun<liu.zhong.kun@oxmail.com>"  # 发件人
    
  3. 创建信息

    msg = Message(subject="This is title", recipients="3500515050@qq.com")
    msg.body = "This is body"
    
  4. 发送信息

    mail.send(msg)
    

总代码:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
__author__ = "A.L.Kun"
__file__ = "app_.py"
__time__ = "2022/6/24 19:56"
__email__ = "liu.zhong.kun@foxmail.com"
from settings import BaseConfig2
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
app.config.from_object(BaseConfig2)
mail = Mail(app)


@app.route('/')
def hello():
    msg = Message(subject="This is title", recipients="3500515050@qq.com")
    msg.body = "This is body"
    message.html = render_template('content.html')  # 可以发送一个html页面
    mail.send(msg)
    return "信息发送完成!"

if __name__ == '__main__':
    app.run(debug=True)

五、 flask_script

1、 简介

安装:pip install flask-script

通过使用Flask-Script扩展,我们可以在Flask服务器启动的时候,通过命令行的方式传入参数。而不仅仅通过app.run()方法中传参,比如我们可以通过python hello.py runserver –host ip地址,告诉服务器在哪个网络接口监听来自客户端的连接

2、 启动服务

from flask import Flask

app = Flask(__name__)

"""使用flask-script启动项目"""
from flask_script import Manager
manager = Manager(app)

@app.route('/')
def index():
    return 'hello world'

if  __name__ == "__main__"
    manager.run()

在命令行输入python manage.py runserver,即可启动服务

3、 传入参数

from flaskScript import create_app
from flask_script import Manager

app = create_app()  # 返回一个app对象
manager = Manager(app)


@manager.command
def custom(arg):
    """
    这个方法可以接收从命令行输入的参数,位置参数
    如:
        运行:python manage.py custom 123
        其会在控制台打印 123
    """
    print(arg)


@manager.option("-n", "--name", dest='name')
@manager.option("-u", "--url", dest="url")
def cmd_(name, url):
    """
    自定义命令,可选参数
    :param name:从命令行传入的姓名
    :param url: 从命令行传入的url
    :return:
    如:
        输入:python manage.py cmd_ -n "lihua" -u "127.0.0.1"
        则会输出:lihua 127.0.0.1
    """
    print(name, url)


if __name__ == '__main__':
    manager.run()

作用:

  • python manage.py runserver ...
  • python manage.py 自定义命令

六、 flask_sqlalchemy

1、 简介

sqlalchemy这里面有sqlalchemy的一些基本操作

flask中一般使用flask-sqlalchemy来操作数据库,使用起来比较简单,易于操作

安装:

pip install flask-sqlalchemy

配置参数

配置选项说明
SQLALCHEMY_DATABASE_URI连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8
SQLALCHEMY_BINDS一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库.
SQLALCHEMY_ECHO调试设置为true
SQLALCHEMY_POOL_SIZE数据库池的大小,默认值为5。
SQLALCHEMY_POOL_TIMEOUT连接超时时间
SQLALCHEMY_POOL_RECYCLE自动回收连接的秒数。
SQLALCHEMY_MAX_OVERFLOW控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。
SQLALCHEMY_TRACK_MODIFICATIONS如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。

2、 使用步骤

  1. 配置数据库信息,在settings.py文件中添加

    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:qew123@127.0.0.1:3306/flask1?charset=utf8"
    
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ECHO = True
    
  2. 创建一个app.py文件,在里面添加

    from flask_sqlalchemy import SQLAlchemy
    
    db = SQLAlchemy()  # 实例化对象
    app = Flask(__name__)
    app.config.from_file("settings.py")  # 导入配置文件
    db.init_app(app)  # 初始化sqlalchemy,使用app里面的配置文件
    
  3. 编写数据库结构,创建一个models.py,与app.py同级

    from app import db
    
    class Users(db.Model):
        __table__ = "user"
        id = db.Column(db.INTEGER, primary_key=True, autoincrement=True)
        name = db.Column(db.String(32))
    
  4. 在路由中使用

    import models
    
    @app.route("/login", methods=["GET", "POST"])
    def login():
        data = db.session.query(models.Users).all()  # 查找数据库里面的所有信息
        print(data)
        db.session.remove()  # 移除session
        return "Login"
    

扩展,离线脚本的使用

from app import app
with app.app_context():
    pass  # 里面可以运行flask运行时进行的操作

七、 flask_migrate

作用:做数据库迁移

其依赖于:flask-script/flask-sqlalchemy

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_script import Manager

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///app.db"  # 连接数据库

db = SQLAlchemy(app)  
manager = Manager(app)
Migrate(app, db)  # 实例化组件

"""
flask db init  # 初始化表
flask db migrate  # 将表在数据库中创建出来
flask db upgrade  # 更新表的结构
"""


# Model
class User(db.Model):
    __tablename__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(128))


if __name__ == '__main__':
    manager.run()

八、 自定义组件

auto.py中添加:

from flask import request, session, redirect


class Auth:
    def __init__(self, app=None):
        self.app = app
        if app is not None:
            self.init_app(app)

    def init_app(self, app):
        app.auto = self  # 将这个类的信息全部写入app里面
        self.app = app
        self.app.before_request(self.check_login)

    def check_login(self):
        print("检测用户是否登录")
        usr = session.get("usr")
        print(usr)

    def login(self, data):
        """创建session"""
        session["usr"] = data

    def login_out(self):
        """用户登出"""
        del session["usr"]

使用这个组件时:

from auto import Auto
from flask import Flask


app = Flask(__name__)

at = Auto()
at.init_app(app)


################################################
from flask import current_app
cerrent_app.login_out()  # 调用组件登出的功能 

最后,总目录结构为:

https://images.cnblogs.com/cnblogs_com/blogs/722174/galleries/2074790/o_220701084945_sort.png

点击我,查看源代码

九、 其它

1、 多app应用

from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple

app01 = Flask("app01")
app02 = Flask("app02")
# //login -> 访问 app01下面的login
dm = DispatcherMiddleware(app01, {
    "/app02": app02  # /appo2/index  访问app02下面的index
})


@app01.route("/login")
def login():
    return "app01.login"


@app02.route("/index")
def index():
    return "app02.index"


if __name__ == '__main__':
    # 请求一旦进来会执行 run_simple 方法的第三个参数加括号:dm(),对象加括号会调用对象的__call__方法
    run_simple("127.0.0.1", 5000, dm)

2、 信号

from flask import Flask, signals

app = Flask(__name__)


def func(*args, **kwargs):
    print("请求开始,触发的app为", *args, **kwargs)


signals.request_started.connect(func)  # 连接信号
"""
含有的全部信号
template_rendered 
before_render_template 
request_started 
request_finished
request_tearing_down
got_request_exception 
appcontext_tearing_down
appcontext_pushed 
appcontext_popped 
message_flashed 
这里可以在源码中查看这些信号的触发条件,源码内部使用send方法触发信号,或者百度也OK
"""

@app.route("/")
def index():
    return "首页面"


if __name__ == '__main__':
    app.run(debug=True)

注意,信号和装饰器的区别是,信号无法控制程序的进行,其只是提供一个提示功能,在原来的基础增加额外的操作和值;而装饰器可以控制请求是否可以继续往后执行

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-07-04 22:49:56  更:2022-07-04 22:51:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/27 2:54:43-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计