Flask常用组件
一、 flask_session
session 是基于cookie实现, 保存在服务端的键值对(形式为 {随机字符串:'uuid'} ), 同时在浏览器中的cookie中也对应一相同的随机字符串,用来再次请求的 时候验证,这个组件的作用是将session数据存储到数据库中
1、 常用配置
SECRET_KEY = os.urandom(32)
SESSION_USE_SIGNER = True
PERMANENT_SESSION_LIFETIME = timedelta(minutes=30)
SESSION_REFRESH_EACH_REQUEST = True
SESSION_TYPE = "redis"
SESSION_REDIS = Redis("127.0.0.1")
SESSION_PREFIX = "MyWebPrefix:"
注意:session 中存储的是字典,当修改字典内部(第二级)元素时,会造成数据不更新
解决方法:
我们要先阅读一下部分源码
class SecureCookieSession(CallbackDict, SessionMixin):
modified = False
accessed = False
def __init__(self, initial: t.Any = None) -> None:
def on_update(self) -> None:
self.modified = True
"""
if not (session.modified # 如果session修改,就会重新设置到cookies中
or
(
session.permanent # 是否设置到cookie里面
and
app.config["SESSION_REFRESH_EACH_REQUEST"] # 如果为True,则每次请求刷新一次
)):
return
...
response.set_cookie(...)
"""
self.accessed = True
"""
if session.accessed:
response.vary.add("Cookie")
"""
super().__init__(initial, on_update)
解决方法:
-
方法一 session.modified = True
-
方法二 app.config["SESSION_REFRESH_EACH_REQUEST"] = True
session.permanent = True
2、 使用方法
2.1 session_interface
通过session_interface 来设置
from flask_session import RedisSessionInterface
from flask import Flask
from os import urandom
from redis import Redis
app = Flask(__name__)
app.serect_key = urandom(32)
app.config["SESSION_USE_SIGNER"] = True
app.session_interface = RedisSessionInterface(
redis=Redis("127.0.0.1"),
key_prefix="flask_login",
)
默认session设置为
from flask.sessions import SecureCookieSessionInterface
app.session_interface = SecureCookieSessionInterface()
修改后,就可以直接使用了,其使用方法和原先的一样
2.2 config
通过配置文件来设置
from flask_session import Session
from flask import Flask
from os import urandom
from redis import Redis
settings = {
"SESSION_TYPE": "redis",
"SESSION_REDIS": Redis("127.0.0.1"),
"SECRET_KEY": os.urandom(32),
"SESSION_USE_SIGNER": True,
"SESSION_REFRESH_EACH_REQUEST": True,
}
app.config.from_mapping(settings)
Session(app)
二、 DBUtils
1、 引言
当我们要对数据库进行操作时,可以这么干:
import pymysql
from functools import wraps
SQL_CONFIG = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
}
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
conn = pymysql.connect(**SQL_CONFIG)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close()
conn.close()
return obj
return inner
class SQLTool:
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchall()
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchone()
return obj
if __name__ == '__main__':
obj = SQLTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)
问题来了,如果有很多连接的话,开启数据库再关闭是不是很麻烦呢?
2、 DBUtils
使用数据库连接池
此连接池有两种模式:
- 为每一个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新发到连接池,供自己线程再次使用。当线程终止,连接自动关闭
- 创建一批连接到连接池,供所有线程共享使用(主要)
2.1 模式一
POOL = PersistentDB(
creator=pymysql,
maxusage=None,
setsession=[],
ping=0,
closeable=False,
threadlocal=None,
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
conn = POOL.connection(shareable=False)
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
cursor.close()
conn.close()
func()
2.2 模式二
import time
import pymysql
import threading
from dbutils.pooled_db import PooledDB, SharedDBConnection
POOL = PooledDB(
creator=pymysql,
maxconnections=6,
mincached=2,
maxcached=5,
maxshared=3,
blocking=True,
maxusage=None,
setsession=[],
ping=0,
host='127.0.0.1',
port=3306,
user='root',
password='123',
database='pooldb',
charset='utf8'
)
def func():
conn = POOL.connection()
cursor = conn.cursor()
cursor.execute('select * from tb1')
result = cursor.fetchall()
conn.close()
func()
3、 代码封装
import pymysql
from functools import wraps
from dbutils.pooled_db import PooledDB
import pymysql
SQL_CONFIG = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
"charset": "utf8",
}
POOL_CONFIG = {
"maxconnections": 6,
"mincached": 2,
"maxcached": 5,
"maxshared": 3,
"blocking": True,
"maxusage": None,
"setsession": [],
"ping": 0,
}
POOL = PooledDB(
creator=pymysql,
**SQL_CONFIG,
**POOL_CONFIG
)
def select_sql(type_):
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
if type_ == "sql":
"""如果通过数据库来获取值的话"""
conn = pymysql.connect(**SQL_CONFIG)
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close()
conn.close()
return obj
elif type_ == "pool":
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close()
conn.close()
return obj
raise ValueError("type_ value error, value = pool or sql")
return inner
return sql_outer
class SQLTool:
"""使用pymysql单线程连接"""
@staticmethod
@select_sql("sql")
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchall()
return obj
@staticmethod
@select_sql("sql")
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchone()
return obj
class DBUtilsTool:
"""使用数据库连接池"""
@staticmethod
@select_sql("pool")
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchall()
return obj
@staticmethod
@select_sql("pool")
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchone()
return obj
obj = DBUtilsTool.fetch_one("SELECT id, name FROM users WHERE name=%s and pwd=%s", ["kun", "123"])
print(obj)
4、 结合flask使用
4.1 方式一
无法创建的原因是flask的上下文管理机制,当flask 还没run() 的时候,无法访问current_app 里面的配置信息
将配置信息放到settings 配置文件中,再使用init_app 方法——模拟Session(app) ,把数据库连接池放到配置文件config 里面
在存放数据库连接池的文件sql.py 中:
import pymysql
from functools import wraps
from flask import current_app
from dbutils.pooled_db import PooledDB
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
POOL = current_app.config["SQL_POOL"]
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close()
conn.close()
return obj
return inner
class DBUtilsTool:
"""使用数据库连接池,模拟Session(app)"""
def __init__(self, app=None):
self.app = app
if app:
self.init_app(app)
def init_app(self, app):
POOL = PooledDB(
**app.config["POOL_CONFIG"]
)
app.config["SQL_POOL"] = POOL
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchall()
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchone()
return obj
在settings.py 文件中:
import pymysql
class BaseConfig:
POOL_CONFIG = {
"creator": pymysql,
"maxconnections": 6,
"mincached": 2,
"maxcached": 5,
"maxshared": 3,
"blocking": True,
"maxusage": None,
"setsession": [],
"ping": 4,
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
"charset": "utf8",
}
在manage.py 主运行程序中
from sql import DBUtilsTool
from flask import Flask
def create_app():
app = Flask(__name__)
app.config.from_object("settings.BaseConfig")
DBUtilsTool(app)
return app
if __name__ == '__main__':
app = create_app()
app.run("0.0.0.0")
4.2 方式二
直接将数据库连接池添加到配置文件中
在sql.py 中
import pymysql
from functools import wraps
from flask import current_app
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
POOL = current_app.config["SQL_POOL"]
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close()
conn.close()
return obj
return inner
class DBUtilsTool:
"""使用数据库连接池"""
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchall()
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchone()
return obj
在settings.py 中
from dbutils.pooled_db import PooledDB
class BaseConfig:
SQL_POOL = PooledDB(
"creator": pymysql,
"maxconnections": 6,
"mincached": 2,
"maxcached": 5,
"maxshared": 3,
"blocking": True,
"maxusage": None,
"setsession": [],
"ping": 4,
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"passwd": "qwe123",
"db": "flask1",
"charset": "utf8",
)
在manage.py 中
from sql import DBUtilsTool
from flask import Flask
def create_app():
app = Flask(__name__)
app.config.from_object("settings.BaseConfig")
return app
if __name__ == '__main__':
app = create_app()
app.run("0.0.0.0")
不建议使用方法二,其没有做到配置与程序分离的效果
4.3 方法三
使用pool.py 在里面创建POOL
from flask import current_app
POOL = PooledDB(
**current_app.config["POOL_CONFIG"]
)
在sql.py 中
import pymysql
from functools import wraps
def sql_outer(fun):
"""使用装饰器,可以对数据库便捷操作"""
@wraps(fun)
def inner(sql, *args):
from .pool import POOL
conn = POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
obj = fun(sql, cursor, *args)
conn.commit()
cursor.close()
conn.close()
return obj
return inner
class DBUtilsTool:
"""使用数据库连接池"""
@staticmethod
@sql_outer
def fetch_all(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchall()
return obj
@staticmethod
@sql_outer
def fetch_one(sql: "sql 语句", cursor=None, *args: "要查询的数据"):
cursor.execute(sql, *args)
obj = cursor.fetchone()
return obj
要注意current_app 的创建时期,其他的使用方法都一样
只要写原生SQL,就要使用数据库连接池
三、 wtforms
作用:专门用于对python web 框架做表单验证
1、 支持的字段
字段 | 描述 |
---|
StringField | 文本字段 | TextAreaField | 多行文本字段 | PasswordField | 密码文本字段 | HiddenField | 隐藏文本字段 | DateField | 文本字段,值为 datetime.date 格式 | DateTimeField | 文本字段,值为 datetime.datetime 格式 | IntegerField | 文本字段,值为整数 | DecimalField | 文本字段,值为 decimal.Decimal | FloatField | 文本字段,值为浮点数 | BooleanField | 复选框,值为 True 和 False | RadioField | 一组单选框 | SelectField | 下拉列表 | SelectMultipleField | 下拉列表,可选择多个值 | FileField | 文件上传字段 | SubmitField | 表单提交按钮 | FormField | 把表单作为字段嵌入另一个表单 | FieldList | 一组指定类型的字段 |
2、 字段参数
参数 | 描述 |
---|
label | 字段别名,在页面中可以通过字段.label展示 | validator | 验证规则列表 | filters | 过氯器列表,用于对提交数据进行过滤 | description | 描述信息,通常用于生成帮助信息 | id | 表示在form类定义时候字段的位置,通常你不需要定义它,默认会按照定义的先后顺序排序 | default | 默认值 | widget | html插件,通过该插件可以覆盖默认的插件,更多通过用户自定义 | render_kw | 自定义html属性 | choices | 复选类型的选项 |
3、 验证函数
验证函数 | 说明 |
---|
Email | 验证是电子邮件地址 | EqualTo | 比较两个字段的值; 常用于要求输入两次密钥进行确认的情况 | IPAddress | 验证IPv4网络地址 | Length | 验证输入字符串的长度 | NumberRange | 验证输入的值在数字范围内 | Optional | 无输入值时跳过其它验证函数 | DataRequired | 确保字段中有数据 | Regexp | 使用正则表达式验证输入值 | URL | 验证url | AnyOf | 确保输入值在可选值列表中 | NoneOf | 确保输入值不在可选列表中 |
可以根据验证函数的源码,来自定义验证函数
4、 代码实例
创建一个上传文件功能的app ,里面包含了对其工作流程的关键步骤分析,希望对理解有用
代码实例:
在app.py 中
from flask import Flask
from wtforms import FileField, Form, SubmitField, widgets, validators
from flask import (request, render_template, current_app)
from werkzeug.utils import secure_filename
import os
class UploadForm(Form):
"""创建上传文件的字段"""
file1 = FileField(
render_kw={"class": "file"},
widget=widgets.FileInput(),
validators=[
validators.DataRequired(message="请选择要上传的文件哦!")
]
)
sbtn = SubmitField(
render_kw={
"id": "upload_submit",
"value": "上传文件",
},
widget=widgets.SubmitInput(),
)
config = {
"UPLOAD_FOLDER": "upload/",
}
app = Flask(__name__)
app.config.from_mapping(config)
@app.route("/upload", methods=['GET', 'POST'])
def upload():
if request.method == "POST":
form = UploadForm(formdata=request.files)
if form.validate():
f = form.file1.data
f.save(os.path.join(current_app.config['UPLOAD_FOLDER'], secure_filename(f.filename)))
return render_template("index.html", form=form, msg=f"上传文件成功!上传的文件为{f.filename}")
else:
return render_template("index.html", form=form, msg="请选择文件哦!")
form = UploadForm()
"""
# 对该类创建时的源码分析
metaclass=FormMeta # 其为元类
# 故,创建对象时
1、 FormMeta.__call__
# 在 __call__ 方法里面进行的步骤
UploadForm._unbound_fields = None
UploadForm._wtforms_meta = None
_unbound_fields = [ # 其根据counter来排序
("checkbox": UnboundField(BooleanField, *args, **kwargs, creation_counter=1))
("sbtn": UnboundField(SubmitField, *args, **kwargs, creation_counter=1))
]
_wtforms_meta = type("Meta", tuple([DefaultMeta]), {}) # DefaultMeta = Form.Meta
= class Meta(DefaultMeta):
pass
2、 UploadForm.__new__
# 在 __new__ 方法里面进行的步骤,然后发现没有 __new__ 方法,除非自定义
pass
3、 UploadForm.__init__
# 执行 __init__ 方法
UploadForm()._fields = {
"file1": FileField(...),
"sbtn": SubmitField(...)
}
UploadForm().name=FileField(...)
UploadForm().sbtn=SubmitField(...)
"""
print(form.file1)
"""
# 访问类的属性时的源码分析
# 故,form.name 执行的是字段类中的 __str__ 方法
Field.__str__ -> return self() # 执行字段的 __call__ 方法
Field.__call__ -> return self.meta.render_field(self, kwargs)
DefaultMeta.render_field(self, kwargs) -> return field.widget(field, **render_kw) # 执行 widget 的 __call__ 方法
Input.__call__ -> return Markup("<input %s>" % self.html_params(name=field.name, **kwargs)) # 进行渲染
"""
for i in form:
print(i)
"""
# 其内部有一个iter方法
BaseField.__iter__ -> iter(self._fields.values())
print(i) # 执行字段内部的 __str__ 方法
"""
return render_template("index.html", form=form)
if __name__ == '__main__':
app.run()
还有验证流程,可以自己尝试一下
在templates/index.html 中
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>文件上传</title>
</head>
<body>
<div class="uploadFile">
<h1>请选择要上传的文件</h1>
<form method="POST" enctype="multipart/form-data">
<p>{{ form.file1 }}</p>
<p style="color: red">{{ form.file1.errors[0] }}</p>
<p>{{ form.sbtn }}</p>
</form>
<p style="color: green">{{ msg }}</p>
</div>
</body>
</html>
四、 flask_mail
1、 简介
在web程序中,经常会需要发送电子邮件。比如,在用户注册账户时发送确认邮件;定期向用户发送热门内容或是促销信息等等。在Web程序中发送电子邮件并不复杂,借助扩展Flask-Mail或是第三方邮件服务,只需要几行代码就可以发送邮件
配置信息:
参数 | 描述 |
---|
MAIL_SERVER | 邮件服务器的名称/IP地址 | MAIL_PORT | 所用服务器的端口号 | MAIL_USE_TLS | 启用/禁用传输安全层加密 | MAIL_USE_SSL | 启用/禁用安全套接字层加密 | MAIL_DEBUG | 调试支持,默认是Flask应用程序的调试状态 | MAIL_USERNAME | 发件人的用户名 | MAIL_PASSWORD | 发件人的密码 | MAIL_DEFAULT_SENDER | 设置默认发件人 | MAIL_MAX_EMAILS | 设置要发送的最大邮件 | MAIL_SUPPRESS_SEND | 如果app.testing设置为true,则发送被抑制 | MAIL_ASCII_ATTACHMENTS | 如果设置为true,则将附加的文件名转换为ASCII |
2、 Mail
它管理电子邮件消息的要求。 类构造函数采用以下形式
方法 | 描述 |
---|
send() | 发送Message类对象的内容 | connect() | 与邮件主机打开连接 | send_message() | 发送消息对象 |
3、 Massage
3.1 实例化对象
Message(
subject='',
recipients=[],
body=None,
html=None,
sender=None,
cc=None,
bcc=None,
attachments=None,
reply_to=None,
date=None,
charset=None,
extra_headers=None,
mail_options=None,
rcpt_options=None
)
3.2 类方法
attach() - 向消息添加附件。 该方法采用以下参数:
filename - 要附加的文件的名称content_type - 文件的MIME类型data - 原始文件数据disposition - 内容处置,如果有的话
add_recipient() - 向消息添加另一个收件人
4、 使用方法
-
初始化邮箱 from flask_mail import Mail, Message
mail = Mail(app)
-
配置邮箱信息 class BaseConfig2():
MAIL_SERVER = "smtp.qq.com"
MAIL_PORT = 465
MAIL_USE_TLS = False
MAIL_USER_SSL = True
MAIL_USERNAME = "liu.zhong.kun@oxmail.com"
MAIL_PASSWORD = "xtisaddfdfntdcjf"
MAIL_DEFAULT_SENDER = "A.L.Kun<liu.zhong.kun@oxmail.com>"
-
创建信息 msg = Message(subject="This is title", recipients="3500515050@qq.com")
msg.body = "This is body"
-
发送信息 mail.send(msg)
总代码:
__author__ = "A.L.Kun"
__file__ = "app_.py"
__time__ = "2022/6/24 19:56"
__email__ = "liu.zhong.kun@foxmail.com"
from settings import BaseConfig2
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
app.config.from_object(BaseConfig2)
mail = Mail(app)
@app.route('/')
def hello():
msg = Message(subject="This is title", recipients="3500515050@qq.com")
msg.body = "This is body"
message.html = render_template('content.html')
mail.send(msg)
return "信息发送完成!"
if __name__ == '__main__':
app.run(debug=True)
五、 flask_script
1、 简介
安装:pip install flask-script
通过使用Flask-Script扩展,我们可以在Flask服务器启动的时候,通过命令行的方式传入参数。而不仅仅通过app.run()方法中传参,比如我们可以通过python hello.py runserver –host ip地址,告诉服务器在哪个网络接口监听来自客户端的连接
2、 启动服务
from flask import Flask
app = Flask(__name__)
"""使用flask-script启动项目"""
from flask_script import Manager
manager = Manager(app)
@app.route('/')
def index():
return 'hello world'
if __name__ == "__main__"
manager.run()
在命令行输入python manage.py runserver ,即可启动服务
3、 传入参数
from flaskScript import create_app
from flask_script import Manager
app = create_app()
manager = Manager(app)
@manager.command
def custom(arg):
"""
这个方法可以接收从命令行输入的参数,位置参数
如:
运行:python manage.py custom 123
其会在控制台打印 123
"""
print(arg)
@manager.option("-n", "--name", dest='name')
@manager.option("-u", "--url", dest="url")
def cmd_(name, url):
"""
自定义命令,可选参数
:param name:从命令行传入的姓名
:param url: 从命令行传入的url
:return:
如:
输入:python manage.py cmd_ -n "lihua" -u "127.0.0.1"
则会输出:lihua 127.0.0.1
"""
print(name, url)
if __name__ == '__main__':
manager.run()
作用:
python manage.py runserver ... python manage.py 自定义命令
六、 flask_sqlalchemy
1、 简介
sqlalchemy 这里面有sqlalchemy 的一些基本操作
flask中一般使用flask-sqlalchemy来操作数据库,使用起来比较简单,易于操作
安装:
pip install flask-sqlalchemy
配置参数
配置选项 | 说明 |
---|
SQLALCHEMY_DATABASE_URI | 连接数据库。示例:mysql://username:password@host/post/db?charset=utf-8 | SQLALCHEMY_BINDS | 一个将会绑定多种数据库的字典。 更多详细信息请看官文 绑定多种数据库. | SQLALCHEMY_ECHO | 调试设置为true | SQLALCHEMY_POOL_SIZE | 数据库池的大小,默认值为5。 | SQLALCHEMY_POOL_TIMEOUT | 连接超时时间 | SQLALCHEMY_POOL_RECYCLE | 自动回收连接的秒数。 | SQLALCHEMY_MAX_OVERFLOW | 控制在连接池达到最大值后可以创建的连接数。当这些额外的 连接回收到连接池后将会被断开和抛弃。 | SQLALCHEMY_TRACK_MODIFICATIONS | 如果设置成 True (默认情况),Flask-SQLAlchemy 将会追踪对象的修改并且发送信号。这需要额外的内存, 如果不必要的可以禁用它。 |
2、 使用步骤
-
配置数据库信息,在settings.py 文件中添加 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:qew123@127.0.0.1:3306/flask1?charset=utf8"
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ECHO = True
-
创建一个app.py 文件,在里面添加 from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
app = Flask(__name__)
app.config.from_file("settings.py")
db.init_app(app)
-
编写数据库结构,创建一个models.py ,与app.py 同级 from app import db
class Users(db.Model):
__table__ = "user"
id = db.Column(db.INTEGER, primary_key=True, autoincrement=True)
name = db.Column(db.String(32))
-
在路由中使用 import models
@app.route("/login", methods=["GET", "POST"])
def login():
data = db.session.query(models.Users).all()
print(data)
db.session.remove()
return "Login"
扩展,离线脚本的使用
from app import app
with app.app_context():
pass
七、 flask_migrate
作用:做数据库迁移
其依赖于:flask-script/flask-sqlalchemy
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_script import Manager
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite:///app.db"
db = SQLAlchemy(app)
manager = Manager(app)
Migrate(app, db)
"""
flask db init # 初始化表
flask db migrate # 将表在数据库中创建出来
flask db upgrade # 更新表的结构
"""
class User(db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(128))
if __name__ == '__main__':
manager.run()
八、 自定义组件
在auto.py 中添加:
from flask import request, session, redirect
class Auth:
def __init__(self, app=None):
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
app.auto = self
self.app = app
self.app.before_request(self.check_login)
def check_login(self):
print("检测用户是否登录")
usr = session.get("usr")
print(usr)
def login(self, data):
"""创建session"""
session["usr"] = data
def login_out(self):
"""用户登出"""
del session["usr"]
使用这个组件时:
from auto import Auto
from flask import Flask
app = Flask(__name__)
at = Auto()
at.init_app(app)
from flask import current_app
cerrent_app.login_out()
最后,总目录结构为:
点击我,查看源代码
九、 其它
1、 多app应用
from flask import Flask
from werkzeug.middleware.dispatcher import DispatcherMiddleware
from werkzeug.serving import run_simple
app01 = Flask("app01")
app02 = Flask("app02")
dm = DispatcherMiddleware(app01, {
"/app02": app02
})
@app01.route("/login")
def login():
return "app01.login"
@app02.route("/index")
def index():
return "app02.index"
if __name__ == '__main__':
run_simple("127.0.0.1", 5000, dm)
2、 信号
from flask import Flask, signals
app = Flask(__name__)
def func(*args, **kwargs):
print("请求开始,触发的app为", *args, **kwargs)
signals.request_started.connect(func)
"""
含有的全部信号
template_rendered
before_render_template
request_started
request_finished
request_tearing_down
got_request_exception
appcontext_tearing_down
appcontext_pushed
appcontext_popped
message_flashed
这里可以在源码中查看这些信号的触发条件,源码内部使用send方法触发信号,或者百度也OK
"""
@app.route("/")
def index():
return "首页面"
if __name__ == '__main__':
app.run(debug=True)
注意,信号和装饰器的区别是,信号无法控制程序的进行,其只是提供一个提示功能,在原来的基础增加额外的操作和值;而装饰器可以控制请求是否可以继续往后执行
|