IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> 【自动化运维新手村】Flask-2认证 -> 正文阅读

[Python知识库]【自动化运维新手村】Flask-2认证

【摘要】

在Flask专题的上一章节中,主要对Web应用的路由,异常处理和接口返回做了更进一步的讲解,虽然代码更健壮,但离在生产环境中使用还差了最关键的一步,那就是认证

认证在任何存在交互的场景中都是十分重要的环节

【认证】

大家需要先有一个概念,那就是认证其实是两个操作

  1. 身份认证
  2. 权限控制

通俗的说就是:

  1. 先验证用户是否合法,在Web应用中用户不合法的体现就是401(Unauthorized)
  2. 再去判断用户是否具有所进行操作的权限,在Web应用中没有权限的提示就是403(Forbidden)

【Flask应用】

身份认证

AK/SK

对于身份认证最简单的方式,就是给调用方一个固定的access_keysecret_key,通常也叫做AK/SK,这在系统被第三方调用的场景中是十分常见的。

代码实现也很简单,如下:

@app.route("/index")
def index():
    ak = request.headers.get("access_key", "")
    sk = request.headers.get("secret_key", "")
    if ak != "admin" or sk != "admin_secret":
         return "认证失败", 401
    # 具体的业务逻辑
    pass

上述代码中,假设调用方将AK/SK放在了请求的Headers中,并且我们的后端应用只允许admin这一个用户调用,如果AK/SK不符,那就返回认证失败,如果成功则可以执行具体的业务逻辑。

相信有的朋友应该已经有了一个想法,那就是将认证的逻辑代码写在路由函数中,那岂不是每个路由函数都得写重复的认证代码?如果没有这个疑问的朋友就需要反思一下了,可以多去看看之前的章节内容。

每个路由函数编写重复的认证逻辑这显然是不合理的,那如何进行优化呢?

可能有的朋友又会觉得,那把认证逻辑抽象成一个单独的函数,每次调用一下不就行了,如下:

from flask import Flask

def permission():
    ak = request.headers.get("access_key", "")
    sk = request.headers.get("secret_key", "")
    if ak == "admin" and sk == "admin_secret":
         return True
    return False

@app.route("/index")
def index():
    if not permission():
          return "认证失败", 401
    # 具体的业务逻辑
    pass

上述代码虽然表面上看起来精简很多,但仍然没有改变将认证逻辑与业务逻辑耦合在一起的事实。

这里大家可以转变一下思路,如果一个路由函数就代表一个业务逻辑,如果需要在执行业务逻辑前做认证,那是不是就相当于要在调用路由函数前做认证?

这样问题的本质就变成了,在调用一个函数前,做一系列的操作,如果合法就调用该函数,如果不合法则不去调用。听起来好像就完全是装饰器的功能(如果还不了解装饰器,强烈建议先去阅读【自动化运维番外篇】- Python装饰器)。代码修改如下:

from functools import wraps
from flask import Flask, request

app = Flask(__name__)

def permission(func):
    @wraps(func)
    def inner():
        ak = request.headers.get("access_key", "")
    		sk = request.headers.get("secret_key", "")
    		if ak == "admin" and sk == "admin_secret":
            return "认证失败", 401
        return func()
    return inner
  
@app.route("/index")
@permission
def index():
    # 具体的业务逻辑
    pass
注册

现在已经可以对固定的AK/SK进行验证了,那下一步就考虑是否可以让用户通过注册来实现自助获取AK/SK,其实相当于就是注册的功能,下面通过用户名密码的方式来实现注册的功能。

但目前后端应用还没有引入数据库,所以可以暂且通过JSON文件的方式来记录用户信息,在用户调用注册接口时,记录其传递的username/password 到JSON文件中,下次就可以通过检索文件,判断该用户是否合法。代码如下:

import os
import json
from functools import wraps
from hashlib import md5
from flask import Flask, request

app = Flask(__name__)
ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")


def permission(func):
    @wraps(func)
    def inner():
        username = request.form.get("username")
        password = request.form.get("password")
        with open("accounts.json", "r+") as f:
            accounts = json.load(f)
        usernames = [account["username"] for account in accounts]
        if username not in usernames:  # 判断是否用户已存在
            return {"data": None, "status_code": "NotFound", "message": "username is not exists"}
        for account in accounts:
            if account["username"] == username:
                if md5(password.encode()).hexdigest() != account["password"]:  # 判断用户名和密码是否一致
                    return {"data": None, "status_code": "Unauthorized", "message": "password is not correct"}
                return func()
    return inner


@app.route("/register", methods=["POST"])
def register():
    """注册用户信息"""
    username = request.form.get("username")
    password = request.form.get("password")
    if not username or not password:  # 判断用户输入的参数
        return {"data": None, "status_code": "InvalidParams", "message": "must have username and password"}
    if not os.path.exists(ACCOUNTS_FILE):  # 判断是否存在指定文件
        return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
    with open("accounts.json", "r+") as f:
        accounts = json.load(f)
    for account in accounts:
        if account["username"] == username:  # 判断是否用户已存在
            return {"data": None, "status_code": "Duplicated", "message": "username is already exists"}
    accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
    with open("accounts.json", "w") as f:
        json.dump(accounts, f)
    return {"data": username, "status_code": "OK", "message": "register username successfully"}


@app.route("/index")
@permission
def index():
    # 具体的业务逻辑
    return "success"


if __name__ == '__main__':
    app.run()

上述代码中指定了一个存放用户信息的文件accounts.json,需要先将其内容初始化,如果不对文件做初始化,那么在进行json.load()时就会抛异常,提示文件内容不是合法的json,初始化如下:

// accounts.json 文件
[]

先通过os.path.abspath(__file__)获取到当前启动文件所在的绝对路径,再通过os.path.dirname(os.path.abspath(__file__))获取到该绝对路径的目录,最后通过os.path.join()将目录与account.json文件名组合在一起,就得到了该文件的绝对路径。

这里我们的用户信息通过数组的方式进行存储,大概模型如下:

[
  {"username": "", "password": ""},
  {"username": "", "password": ""}
]

注册功能乍一想比较简单,但实际中需要进行的异常判断仍然不少,在注册用户的路由函数中就对多种可预见的异常进行了提前处理,并返回错误信息。并且在保存密码阶段,做了特殊的处理,因为原则上即使是应用方也无权知道用户的真实密码,所以需要在保存用户信息时,对密码做哈希处理,如下:

accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})

并且在验证时同样使用密码的哈希去进行比较,如下:

if md5(password.encode()).hexdigest() != account["password"]

最终通过postman调用接口如下:
在这里插入图片描述

登录

现在用户已经可以通过注册的方式,在后端应用保存自己的用户名密码,这样每次在请求中携带用户名密码信息就可以通过认证了。

但如果可以实现用户登录的话,用户就可以只登录一次,在登录的有效时间内,都可以进行正常的请求访问,且无需每次请求都传递用户名和密码。

所以代码需要做如下修改:

1.注册逻辑保持不变。

2.新增全局常量LOGIN_TIMEOUT,设置一个固定的登录有效期。

3.新增全局变量SESSION_IDS用来记录已登录用户的信息,以及用户的登录时间。

4.新增登录的路由函数,验证用户是否已注册,已注册的用户且用户名密码正确则登录成功,记录该用户的登录信息,并返回生成的session_id

5.修改装饰器函数,获取请求头中的session_id字段,判断用户是否已登录且是否在有效期内,如果超过有效期则将该用户的登录信息从SESSION_IDS中移除,每次发起请求且认证通过后都更新登录时间戳,以延长登录有效时间

代码如下:

import os
import time
import json
from hashlib import md5
from functools import wraps
from flask import Flask, request

app = Flask(__name__)

ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")

SESSION_IDS = {}

LOGIN_TIMEOUT = 60 * 60 * 24

def permission(func):
    @wraps(func)
    def inner():
        session_id = request.headers.get("session_id", "")
        global SESSION_IDS
        if session_id not in SESSION_IDS:  # 是否存在会话信心
            return {"data": None, "status_code": "FORBIDDEN", "message": "username not login"}
        if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:  # 是否会话仍有效
            SESSION_IDS.pop(session_id)  # 如果失效则移除会话信息
            return {"data": None, "status_code": "FORBIDDEN", "message": "username login timeout"}
        SESSION_IDS[session_id] = time.time()  # 更新会话时间
        return func()
    return inner


@app.route("/register", methods=["POST"])
def register():
    """注册用户信息"""
    username = request.form.get("username")
    password = request.form.get("password")
    if not username or not password:  # 判断用户输入的参数
        return {"data": None, "status_code": "InvalidParams", "message": "must have username and password"}
    if not os.path.exists(ACCOUNTS_FILE):  # 判断是否存在指定文件
        return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
    with open("accounts.json", "r+") as f:
        accounts = json.load(f)
    for account in accounts:
        if account["username"] == username:  # 判断是否用户已存在
            return {"data": None, "status_code": "Duplicated", "message": "username is already exists"}
    accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
    with open("accounts.json", "w") as f:
        json.dump(accounts, f)
    return {"data": username, "status_code": "OK", "message": "register username successfully"}


@app.route("/login", methods=["POST"])
def login():
    """用户登录"""
    username = request.form.get("username")
    password = request.form.get("password")
    if not os.path.exists(ACCOUNTS_FILE):  # 是否存在用户信息文件
        return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
    with open("accounts.json", "r+") as f:
        accounts = json.load(f)
    usernames = [account["username"] for account in accounts]
    if username not in usernames:  # 是否用户已注册
        return {"data": None, "status_code": "NotFound", "message": "username is not exists"}
    current_user = None
    for account in accounts:
        if account["username"] == username:
            current_user = account
            if md5(password.encode()).hexdigest() != account["password"]:  # 是否用户名密码正确
                return {"data": None, "status_code": "Unauthorized", "message": "password is not correct"}
            session_id = md5((password + str(time.time())).encode()).hexdigest()  # 生成会话ID
            global SESSION_IDS
            SESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}  # 记录会话信息
            return {"data": {"session_id": session_id}, "status_code": "OK", "message": "login successfully"}


@app.route("/cmdb", methods=["POST"])
@permission
def index():
    pass
    return "success"


if __name__ == "__main__":
    app.run(host="127.0.0.1", port=5000, debug=True)

通过Postman发起登录请求如下:

在这里插入图片描述
在这里插入图片描述

完整代码
import os
import time
import json
from hashlib import md5
from functools import wraps
from flask import Flask, request

app = Flask(__name__)

ACCOUNTS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "accounts.json")

SESSION_IDS = {}

LOGIN_TIMEOUT = 60 * 60 * 24

def permission(func):
    @wraps(func)
    def inner():
        session_id = request.headers.get("session_id", "")
        global SESSION_IDS
        if session_id not in SESSION_IDS:
            return {"data": None, "status_code": "FORBIDDEN", "message": "username not login"}
        if SESSION_IDS[session_id]["timestamp"] - time.time() > LOGIN_TIMEOUT:
            SESSION_IDS.pop(session_id)
            return {"data": None, "status_code": "FORBIDDEN", "message": "username login timeout"}
        SESSION_IDS[session_id] = time.time()
        return func()
    return inner


@app.route("/register", methods=["POST"])
def register():
    """ 注册用户信息 """
    username = request.form.get("username")
    password = request.form.get("password")
    if not username or not password:
        return {"data": None, "status_code": "InvalidParams", "message": "must have username and password"}
    if not os.path.exists(ACCOUNTS_FILE):
        return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
    with open("accounts.json", "r+") as f:
        accounts = json.load(f)
    for account in accounts:
        if account["username"] == username:
            return {"data": None, "status_code": "Duplicated", "message": "username is already exists"}
    accounts.append({"username": username, "password": md5(password.encode()).hexdigest()})
    with open("accounts.json", "w") as f:
        json.dump(accounts, f)
    return {"data": username, "status_code": "OK", "message": "register username successfully"}


@app.route("/login", methods=["POST"])
def login():
    """用户登录"""
    username = request.form.get("username")
    password = request.form.get("password")
    if not os.path.exists(ACCOUNTS_FILE):
        return {"data": None, "status_code": "NotFound", "message": "not found accounts file"}
    with open("accounts.json", "r+") as f:
        accounts = json.load(f)
    usernames = [account["username"] for account in accounts]
    if username not in usernames:
        return {"data": None, "status_code": "NotFound", "message": "username is not exists"}
    current_user = None
    for account in accounts:
        if account["username"] == username:
            current_user = account
            if md5(password.encode()).hexdigest() != account["password"]:
                return {"data": None, "status_code": "Unauthorized", "message": "password is not correct"}
            session_id = md5((password + str(time.time())).encode()).hexdigest()
            global SESSION_IDS
            SESSION_IDS[session_id] = {"user_info": current_user, "timestamp": time.time()}
            return {"data": {"session_id": session_id}, "status_code": "OK", "message": "login successfully"}


@app.route("/index", methods=["GET"])
@permission
def index():
    pass
    return "success"


if __name__ == "__main__":
    app.run(host="127.0.0.1", port=5000, debug=True)

【总结】

这一章节主要讲解了关于用户身份认证的原理和具体实现,关于权限校验下一章节再详细介绍。

其实时Flask中也有第三方插件可以实现登录,叫做flask-login,感兴趣的同学可以了解一下,但我们的目的主要是要了解身份认证的具体逻辑,而不是当一个“调包侠”。

除此之外,有一点需要大家仔细思考的是,编程是面对的计算机,所以对于逻辑的严谨和异常的处理,都需要十分仔细,平时在实现某个功能时并不是说通过正向思维,把这个功能的具体逻辑用代码翻译过来就可以了;而是要对这个功能的流程中所牵扯到的各种边界情况做充足的考虑。

最后留一个思考题,如果大家能仔细思考各种边界条件的话,会发现最终的代码存在一个并发问题:

当很多人同时发起请求进行注册的时候,是否会导致文件出现写冲突,如果会,那应用如何解决呢?


欢迎大家添加我的个人公众号【Python玩转自动化运维】加入读者交流群,获取更多干货内容

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-02-14 21:05:39  更:2022-02-14 21:06:01 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/26 13:39:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计