【摘要】
在Flask专题的上一章节中,主要对Web应用的路由,异常处理和接口返回做了更进一步的讲解,虽然代码更健壮,但离在生产环境中使用还差了最关键的一步,那就是认证。
认证在任何存在交互的场景中都是十分重要的环节。
【认证】
大家需要先有一个概念,那就是认证其实是两个操作:
- 身份认证
- 权限控制
通俗的说就是:
- 先验证用户是否合法,在Web应用中用户不合法的体现就是
401(Unauthorized) - 再去判断用户是否具有所进行操作的权限,在Web应用中没有权限的提示就是
403(Forbidden) 。
【Flask应用】
身份认证
AK/SK
对于身份认证最简单的方式,就是给调用方一个固定的access_key 和secret_key ,通常也叫做AK/SK,这在系统被第三方调用的场景中是十分常见的。
代码实现也很简单,如下:
@app.route("/index")
def index():
ak = request.headers.get("access_key", "")
sk = request.headers.get("secret_key", "")
if ak != "admin" or sk != "admin_secret":
return "认证失败", 401
pass
上述代码中,假设调用方将AK/SK放在了请求的Headers中,并且我们的后端应用只允许admin 这一个用户调用,如果AK/SK不符,那就返回认证失败,如果成功则可以执行具体的业务逻辑。
相信有的朋友应该已经有了一个想法,那就是将认证的逻辑代码写在路由函数中,那岂不是每个路由函数都得写重复的认证代码?如果没有这个疑问的朋友就需要反思一下了,可以多去看看之前的章节内容。
每个路由函数编写重复的认证逻辑这显然是不合理的,那如何进行优化呢?
可能有的朋友又会觉得,那把认证逻辑抽象成一个单独的函数,每次调用一下不就行了,如下:
from flask import Flask
def permission():
ak = request.headers.get("access_key", "")
sk = request.headers.get("secret_key", "")
if ak == "admin" and sk == "admin_secret":
return True
return False
@app.route("/index")
def index():
if not permission():
return "认证失败", 401
pass
上述代码虽然表面上看起来精简很多,但仍然没有改变将认证逻辑与业务逻辑耦合在一起的事实。
这里大家可以转变一下思路,如果一个路由函数就代表一个业务逻辑,如果需要在执行业务逻辑前做认证,那是不是就相当于要在调用路由函数前做认证?
这样问题的本质就变成了,在调用一个函数前,做一系列的操作,如果合法就调用该函数,如果不合法则不去调用。听起来好像就完全是装饰器的功能(如果还不了解装饰器,强烈建议先去阅读【自动化运维番外篇】- Python装饰器)。代码修改如下:
from functools import wraps
from flask import Flask, request
app = Flask(__name__)
def permission(func):
@wraps(func)
def inner():
ak = request.headers.get("access_key", "")
sk = request.headers.get("secret_key", "")
if ak == "admin" and sk == "admin_secret":
return "认证失败", 401
return func()
return inner
@app.route("/index")
@permission
def index():
pass
注册
现在已经可以对固定的AK/SK进行验证了,那下一步就考虑是否可以让用户通过注册来实现自助获取AK/SK,其实相当于就是注册的功能,下面通过用户名密码的方式来实现注册的功能。
但目前后端应用还没有引入数据库,所以可以暂且通过JSON文件的方式来记录用户信息,在用户调用注册接口时,记录其传递的username/password 到JSON文件中,下次就可以通过检索文件,判断该用户是否合法。代码如下:
import os
import json
from functools import wraps
from hashlib import md5
from flask import Flask, request
app = Flask(__name__)
ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")
def permission(func):
@wraps(func)
def inner():
username = request.form.get("username")
password = request.form.get("password")
with open("accounts.json", "r+") as f:
accounts = json.load(f)
usernames = [account["username"] for account in accounts]
if username not in usernames:
return {"data": None, "status_code": "NotFound", "message": "username is not exists"}
for account in accounts:
if account["username"] == username:
if md5(password.encode()).hexdigest() != account["password"]:
return {"data": None, "status_code": "Unauthorized", "message": "password is not correct"}
return func()
return inner
@app.route("/register", methods=["POST"])
def register():
"""注册用户信息"""
username = request.form.get("username")
password = request.form.get("password")
if not username or not password:
return {"data": None, "status_code": "InvalidParams", "message": "must have username and password"}
if not os.path.exists(ACCOUNTS_FILE):
return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
with open("accounts.json", "r+") as f:
accounts = json.load(f)
for account in accounts:
if account["username"] == username:
return {"data": None, "status_code": "Duplicated", "message": "username is already exists"}
accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
with open("accounts.json", "w") as f:
json.dump(accounts, f)
return {"data": username, "status_code": "OK", "message": "register username successfully"}
@app.route("/index")
@permission
def index():
return "success"
if __name__ == '__main__':
app.run()
上述代码中指定了一个存放用户信息的文件accounts.json ,需要先将其内容初始化,如果不对文件做初始化,那么在进行json.load() 时就会抛异常,提示文件内容不是合法的json,初始化如下:
[]
先通过os.path.abspath(__file__) 获取到当前启动文件所在的绝对路径,再通过os.path.dirname(os.path.abspath(__file__)) 获取到该绝对路径的目录,最后通过os.path.join() 将目录与account.json 文件名组合在一起,就得到了该文件的绝对路径。
这里我们的用户信息通过数组的方式进行存储,大概模型如下:
[
{"username": "", "password": ""},
{"username": "", "password": ""}
]
注册功能乍一想比较简单,但实际中需要进行的异常判断仍然不少,在注册用户的路由函数中就对多种可预见的异常进行了提前处理,并返回错误信息。并且在保存密码阶段,做了特殊的处理,因为原则上即使是应用方也无权知道用户的真实密码,所以需要在保存用户信息时,对密码做哈希处理,如下:
accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
并且在验证时同样使用密码的哈希去进行比较,如下:
if md5(password.encode()).hexdigest() != account["password"]
最终通过postman调用接口如下:
登录
现在用户已经可以通过注册的方式,在后端应用保存自己的用户名密码,这样每次在请求中携带用户名密码信息就可以通过认证了。
但如果可以实现用户登录的话,用户就可以只登录一次,在登录的有效时间内,都可以进行正常的请求访问,且无需每次请求都传递用户名和密码。
所以代码需要做如下修改:
1.注册逻辑保持不变。
2.新增全局常量LOGIN_TIMEOUT ,设置一个固定的登录有效期。
3.新增全局变量SESSION_IDS 用来记录已登录用户的信息,以及用户的登录时间。
4.新增登录的路由函数,验证用户是否已注册,已注册的用户且用户名密码正确则登录成功,记录该用户的登录信息,并返回生成的session_id 。
5.修改装饰器函数,获取请求头中的session_id 字段,判断用户是否已登录且是否在有效期内,如果超过有效期则将该用户的登录信息从SESSION_IDS 中移除,每次发起请求且认证通过后都更新登录时间戳,以延长登录有效时间
代码如下:
import os
import time
import json
from hashlib import md5
from functools import wraps
from flask import Flask, request
app = Flask(__name__)
ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")
SESSION_IDS = {}
LOGIN_TIMEOUT = 60 * 60 * 24
def permission(func):
@wraps(func)
def inner():
session_id = request.headers.get("session_id", "")
global SESSION_IDS
if session_id not in SESSION_IDS:
return {"data": None, "status_code": "FORBIDDEN", "message": "username not login"}
if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:
SESSION_IDS.pop(session_id)
return {"data": None, "status_code": "FORBIDDEN", "message": "username login timeout"}
SESSION_IDS[session_id] = time.time()
return func()
return inner
@app.route("/register", methods=["POST"])
def register():
"""注册用户信息"""
username = request.form.get("username")
password = request.form.get("password")
if not username or not password:
return {"data": None, "status_code": "InvalidParams", "message": "must have username and password"}
if not os.path.exists(ACCOUNTS_FILE):
return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
with open("accounts.json", "r+") as f:
accounts = json.load(f)
for account in accounts:
if account["username"] == username:
return {"data": None, "status_code": "Duplicated", "message": "username is already exists"}
accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
with open("accounts.json", "w") as f:
json.dump(accounts, f)
return {"data": username, "status_code": "OK", "message": "register username successfully"}
@app.route("/login", methods=["POST"])
def login():
"""用户登录"""
username = request.form.get("username")
password = request.form.get("password")
if not os.path.exists(ACCOUNTS_FILE):
return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
with open("accounts.json", "r+") as f:
accounts = json.load(f)
usernames = [account["username"] for account in accounts]
if username not in usernames:
return {"data": None, "status_code": "NotFound", "message": "username is not exists"}
current_user = None
for account in accounts:
if account["username"] == username:
current_user = account
if md5(password.encode()).hexdigest() != account["password"]:
return {"data": None, "status_code": "Unauthorized", "message": "password is not correct"}
session_id = md5((password + str(time.time())).encode()).hexdigest()
global SESSION_IDS
SESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}
return {"data": {"session_id": session_id}, "status_code": "OK", "message": "login successfully"}
@app.route("/cmdb", methods=["POST"])
@permission
def index():
pass
return "success"
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5000, debug=True)
通过Postman发起登录请求如下:
完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from flask import Flask, request
app = Flask(__name__)
ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")
SESSION_IDS = {}
LOGIN_TIMEOUT = 60 * 60 * 24
def permission(func):
@wraps(func)
def inner():
session_id = request.headers.get("session_id", "")
global SESSION_IDS
if session_id not in SESSION_IDS:
return {"data": None, "status_code": "FORBIDDEN", "message": "username not login"}
if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:
SESSION_IDS.pop(session_id)
return {"data": None, "status_code": "FORBIDDEN", "message": "username login timeout"}
SESSION_IDS[session_id] = time.time()
return func()
return inner
@app.route("/register", methods=["POST"])
def register():
""" 注册用户信息 """
username = request.form.get("username")
password = request.form.get("password")
if not username or not password:
return {"data": None, "status_code": "InvalidParams", "message": "must have username and password"}
if not os.path.exists(ACCOUNTS_FILE):
return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
with open("accounts.json", "r+") as f:
accounts = json.load(f)
for account in accounts:
if account["username"] == username:
return {"data": None, "status_code": "Duplicated", "message": "username is already exists"}
accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
with open("accounts.json", "w") as f:
json.dump(accounts, f)
return {"data": username, "status_code": "OK", "message": "register username successfully"}
@app.route("/login", methods=["POST"])
def login():
"""用户登录"""
username = request.form.get("username")
password = request.form.get("password")
if not os.path.exists(ACCOUNTS_FILE):
return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
with open("accounts.json", "r+") as f:
accounts = json.load(f)
usernames = [account["username"] for account in accounts]
if username not in usernames:
return {"data": None, "status_code": "NotFound", "message": "username is not exists"}
current_user = None
for account in accounts:
if account["username"] == username:
current_user = account
if md5(password.encode()).hexdigest() != account["password"]:
return {"data": None, "status_code": "Unauthorized", "message": "password is not correct"}
session_id = md5((password + str(time.time())).encode()).hexdigest()
global SESSION_IDS
SESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}
return {"data": {"session_id": session_id}, "status_code": "OK", "message": "login successfully"}
@app.route("/index", methods=["GET"])
@permission
def index():
pass
return "success"
if __name__ == "__main__":
app.run(host="127.0.0.1", port=5000, debug=True)
【总结】
这一章节主要讲解了关于用户身份认证的原理和具体实现,关于权限校验下一章节再详细介绍。
其实时Flask中也有第三方插件可以实现登录,叫做flask-login ,感兴趣的同学可以了解一下,但我们的目的主要是要了解身份认证的具体逻辑,而不是当一个“调包侠”。
除此之外,有一点需要大家仔细思考的是,编程是面对的计算机,所以对于逻辑的严谨和异常的处理,都需要十分仔细,平时在实现某个功能时并不是说通过正向思维,把这个功能的具体逻辑用代码翻译过来就可以了;而是要对这个功能的流程中所牵扯到的各种边界情况做充足的考虑。
最后留一个思考题,如果大家能仔细思考各种边界条件的话,会发现最终的代码存在一个并发问题:
当很多人同时发起请求进行注册的时候,是否会导致文件出现写冲突,如果会,那应用如何解决呢?
欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容
|