| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 网络协议 -> 26.FastAPI安全性 -> 正文阅读 |
|
[网络协议]26.FastAPI安全性 |
26.FastAPI安全性软件开发中,安全是永恒的话题,FastAPI作为一个优秀的Python Web开发框架,为用户提供了多种工具,帮助用户以标准的方式轻松快速地解决软件开发中的安全性。 FastAPI 的 fastapi.security 模块中为各种安全方案提供了一些工具,这些工具简化了这些安全机制的使用方法。 26.1 OAuth2PasswordBearerFastAPI提供的OAuth2PasswordBearer是使用 OAuth2的密码授权模式的Bearer Token(不记名 token) 。创建OAuth2PasswordBearer 实例需要接收URL作为参数。 客户端会向该 URL 通过表单的格式发送 username 和 password 参数,然后得到一个 token 值;OAuth2PasswordBearer 并不会创建相应的 URL 路径操作,只是指明了客户端用来获取 token 的目标 URL。 代码示例: # coding: utf-8 from fastapi import FastAPI from fastapi import Depends from fastapi.security import OAuth2PasswordBearer ? app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") ? @app.get(path='/') async def root(token: str = Depends(oauth2_scheme)): ? ?return "Hello world" 在上面的代码中, tokenUrl="token"指的token是相对 URL 。 此时访问,其返回结果: curl http://127.0.0.1:8000 -i HTTP/1.1 401 Unauthorized date: Tue, 08 Feb 2022 09:28:05 GMT server: uvicorn www-authenticate: Bearer content-length: 30 content-type: application/json ? {"detail":"Not authenticated"} 上面的结果表明:访问的内容以及被保护,必须经过授权后才可以访问。 26.2 OAuth2PasswordRequestFormOAuth2PasswordRequestForm是一个用于获取用户和密码的请求表单类, OAuth2规定客户端必须将username和password字段作为表单数据发送,不可使用 JSON 。OAuth2PasswordRequestForm声明的请求表单:
当获取到表单数据后,需要进行密码校验,一般情况下,我们都会考虑使用哈希密码,PassLib 是一个用于处理哈希密码的非常好的 Python 包,它支持许多安全哈希算法以及配合算法使用的实用程序。 pip install passlib 具体passlib的使用方法可以查看其文档Passlib 1.7.4 documentation — Passlib v1.7.4 Documentation 下面的代码示例在上面代码的基础上增加用户登录及Token验证 # coding: utf-8 from fastapi import FastAPI from fastapi import Depends from fastapi import HTTPException from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordRequestForm from passlib.hash import pbkdf2_sha256 ? app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") ? # 用户列表 usrs = [ ? { ? ? ? ?'usr_id': 'u000010001', ? ? ? ?'usr_acc': 'abc', ? ? ? ?'usr_pwd': '$pbkdf2-sha256$29000$MTAxMDEwMTg$fUI/40Zxj6.62GHgUX9PbDZ0SybccxZwn3Wl3qJ8U/M', # a1b2c3 ? ? ? ?'salt': '10181010' ? }, ? { ? ? ? ?'usr_id': 'u000010002', ? ? ? ?'usr_acc': 'xyz', ? ? ? ?'usr_pwd': '$pbkdf2-sha256$29000$MTAxMDEwMTg$Eu7mZV80f.tu4RDXSsst9gDjV4fXJ.9S7t1hgcGMMVk', # x1y2z3 ? ? ? ?'salt': '10101018' ? } ] ? # Token校验 async def verify_token(token: str): ? ?found = False ? ?for usr in usrs: ? ? ? ?if usr['usr_id'] == token: ? ? ? ? ? ?found = True ? ? ? ? ? ?break ? ?if not found: ? ? ? ?raise HTTPException(status_code=401) ? ? # 登录并返回Token @app.post(path='/token') async def login(form: OAuth2PasswordRequestForm = Depends()): ? ?foundUsr = None ? ?for usr in usrs: ? ? ? ?if usr['usr_acc'] == form.username: ? ? ? ? ? ?foundUsr = usr ? ? ?if foundUsr is None or pbkdf2_sha256.hash(form.password, salt=usr['salt'].encode()) != foundUsr['usr_pwd']: ? ? ? ?raise HTTPException(status_code=400, detail="Incorrect username or password") ? ?else: ? ? ? ?return {"access_token": foundUsr['usr_id'], "token_type": "bearer"} ? ? @app.get(path='/') async def root(token: str = Depends(oauth2_scheme)): ? ?await verify_token(token) ? ?return "Hello world" 启动应用并执行请求: 测试无效登录: curl -d "username=xyz&password=x1y2z38" -X POST http://127.0.0.1:8000/token -i HTTP/1.1 400 Bad Request date: Tue, 08 Feb 2022 12:16:37 GMT server: uvicorn content-length: 43 content-type: application/json ? {"detail":"Incorrect username or password"} 测试正常登录: curl -d "username=xyz&password=x1y2z3" -X POST http://127.0.0.1:8000/token -i HTTP/1.1 200 OK date: Tue, 08 Feb 2022 12:17:45 GMT server: uvicorn content-length: 51 content-type: application/json ? {"access_token":"u000010002","token_type":"bearer"} 返回token,在Headers中使用token访问: curl -H "Authorization:Bearer u000010002" http://127.0.0.1:8000 -i HTTP/1.1 200 OK date: Tue, 08 Feb 2022 12:19:16 GMT server: uvicorn content-length: 13 content-type: application/json ? "Hello world" 修改token后请求: curl -H "Authorization:Bearer u000010007" http://127.0.0.1:8000 -i HTTP/1.1 401 Unauthorized date: Tue, 08 Feb 2022 12:19:56 GMT server: uvicorn content-length: 25 content-type: application/json ? {"detail":"Unauthorized"} 上面的代码如果去掉 await verify_token(token) 行,则: curl -H "Authorization:Bearer u000010007" http://127.0.0.1:8000 -i 会得到返回结果,原因是默认情况下,OAuth2PasswordBearer只负责请求头中是否具有Authorization:Bearer,如果有就会执行相应的请求,所以,为了验证Token的正确性,需要每个方法都执行相应的验证代码。 本例只作为例子,在实际开发中不会直接拿用户ID作为Token,为了提高系统的安全性,需要使用 JWT。下面我们就介绍 JWT。 26.3 JWT ( JSON Web Tokens )JWT是一个将 JSON 对象编码为密集且没有空格的长字符串的标准。 具体学习和了解 JWT,请参考 https://jwt.io。 需要提到的主要是 JWT中的sub,JWT 的规范中有一个 sub 键,值为该令牌的主题。使用它并不是必须的,但这是我们放置用户标识的地方,所以一般情况下,我们在sub中存放用户ID, 为了避免 ID 冲突,当为创建 JWT 令牌时,可以在 sub 键的值前加上前缀,例如 username:、userid:等。 在 Python 中生成和校验 JWT 令牌 ,可以使用PyJWT,也可以使用 python-jose 。我们在本例中使用 python-jose 来编写代码。 pip install python-jose 使用: from jose import jwt 使用 JWT,需要在系统中添加一个SECRET_KEY变量,用于生成令牌,如: SECRET_KEY='5e9eb66688de11eca78070c94ec87656a649b0cc88de11eca8b470c94ec87656' 以下代码在上面代码的基础上使用 JWT 令牌。 # coding: utf-8 from fastapi import FastAPI from fastapi import Depends from fastapi import HTTPException from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordRequestForm from passlib.hash import pbkdf2_sha256 from datetime import datetime from datetime import timedelta from jose import jwt import json ? app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") ? SECRET_KEY='5e9eb66688de11eca78070c94ec87656a649b0cc88de11eca8b470c94ec87656' ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 1 ? # 用户列表 usrs = [ ? { ? ? ? ?'usr_id': 'u000010001', ? ? ? ?'usr_acc': 'abc', ? ? ? ?'usr_pwd': '$pbkdf2-sha256$29000$MTAxMDEwMTg$fUI/40Zxj6.62GHgUX9PbDZ0SybccxZwn3Wl3qJ8U/M', # a1b2c3 ? ? ? ?'salt': '10181010' ? }, ? { ? ? ? ?'usr_id': 'u000010002', ? ? ? ?'usr_acc': 'xyz', ? ? ? ?'usr_pwd': '$pbkdf2-sha256$29000$MTAxMDEwMTg$Eu7mZV80f.tu4RDXSsst9gDjV4fXJ.9S7t1hgcGMMVk', # x1y2z3 ? ? ? ?'salt': '10101018' ? } ] ? ? # 生成Token def build_access_token(usr: dict): ? ?for_encode = {'sub': json.dumps(usr)} ? ?expire = datetime.utcnow() + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) ? ?for_encode.update({"exp": expire}) ? ?jwt_code = jwt.encode(for_encode, SECRET_KEY, algorithm=ALGORITHM) ? ?return jwt_code ? ? # Token校验 def verify_token(token: str): ? ?try: ? ? ? ?payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) ? ? ? ?print(payload) ? ?except Exception as ex: ? ? ? ?print(str(ex)) ? ? ? ?raise HTTPException(status_code=401) ? ? # 登录并返回Token @app.post(path='/token') async def login(form: OAuth2PasswordRequestForm = Depends()): ? ?foundUsr = None ? ?for usr in usrs: ? ? ? ?if usr['usr_acc'] == form.username: ? ? ? ? ? ?foundUsr = usr ? ? ?if foundUsr is None or pbkdf2_sha256.hash(form.password, salt=usr['salt'].encode()) != foundUsr['usr_pwd']: ? ? ? ?raise HTTPException(status_code=400, detail="Incorrect username or password") ? ?else: ? ? ? ?return {"access_token": build_access_token(foundUsr), "token_type": "bearer"} ? ? @app.get(path='/') async def root(token: str = Depends(oauth2_scheme)): ? ?verify_token(token) ? ?return "Hello world" 与前面的代码差别之处: 1.生成Token的函数:build_access_token 2.校验Token的函数:verify_token 3.登录函数:login 请求测试: 登录: curl -d "username=abc&password=a1b2c3" -X POST http://127.0.0.1:8000/token {"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ7XCJ1c3JfaWRcIjogXCJ1MDAwMDEwMDAxXCIsIFwidXNyX2FjY1wiOiBcImFiY1wiLCBcInVzcl9wd2RcIjogXCIkcGJrZGYyLXNoYTI1NiQyOTAwMCRNVEF4TURFd01UZyRmVUkvNDBaeGo2LjYyR0hnVVg5UGJEWjBTeWJjY3had24zV2wzcUo4VS9NXCIsIFwic2FsdFwiOiBcIjEwMTgxMDEwXCJ9IiwiZXhwIjoxNjQ0NDEzMDI5fQ.oer3oMlf3eN-d6RknIY5Yqjq7SRK4IxXE83ldyrVEw0","token_type":"bearer"} 令牌访问: curl -H "Authorization:Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ7XCJ1c3JfaWRcIjogXCJ1MDAwMDEwMDAxXCIsIFwidXNyX2FjY1wiOiBcImFiY1wiLCBcInVzcl9wd2RcIjogXCIkcGJrZGYyLXNoYTI1NiQyOTAwMCRNVEF4TURFd01UZyRmVUkvNDBaeGo2LjYyR0hnVVg5UGJEWjBTeWJjY3had24zV2wzcUo4VS9NXCIsIFwic2FsdFwiOiBcIjEwMTgxMDEwXCJ9IiwiZXhwIjoxNjQ0NDEzMDI5fQ.oer3oMlf3eN-d6RknIY5Yqjq7SRK4IxXE83ldyrVEw0" http://127.0.0.1:8000 "Hello world" 错误的令牌访问: curl -H "Authorization:Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ7XCJ1c3JfaWRcIjogXCJ1MDAwMDEwMDAxXCIsIFwidXNyX2FjY1wiOiBcImFiY1wiLCBcInVzcl9wd2RcIjogXCIkcGJrZGYyLXNoYTI1NiQyOTAwMCRNVEF4TURFd01UZyRmVUkvNDBaeGo2LjYyR0hnVVg5UGJEWjBTeWJjY3had24zV2wzcUo4VS9NXCIsIFwic2FsdFwiOiBcIjEwMTgxMDEwXCJ9IiwiZXhwIjoxNjQ0NDEzMDI5fQ.oer3oMlf3eN-d6RknIY5Yqjq7SRK4IxXE83l" http://127.0.0.1:8000 -i HTTP/1.1 401 Unauthorized date: Tue, 08 Feb 2022 13:26:57 GMT server: uvicorn content-length: 25 content-type: application/json ? {"detail":"Unauthorized"} 26.4 获取当前用户在大部分应用程序中,当用户访问某个接口API的时候,都需要明确访问者的身份,所以在应用程序中需要随时获取当前用户,由于在 JWT 令牌的 sub 字段中已经保存了用户信息,所以获取当前用户只需要对令牌解码即可。 在上面的代码的基础上,增加两个函数,代码如下: #获取当前用户 def find_current_usr(token: str = Depends(oauth2_scheme)): ? ?current_usr = None ? ?try: ? ? ? ?payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) ? ? ? ?current_usr = json.loads(payload['sub']) ? ?except Exception as ex: ? ? ? ?print(str(ex)) ? ? ? ?raise HTTPException(status_code=401) ? ?if current_usr is None: raise HTTPException(status_code=401) ? ?return current_usr # 获取登录者的信息 @app.get(path='/myinfo') async def myinfo(current_usr: dict = Depends(find_current_usr)): ? ?return current_usr 请求测试: curl -H "Authorization:Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ7XCJ1c3JfaWRcIjogXCJ1MDAwMDEwMDAxXCIsIFwidXNyX2FjY1wiOiBcImFiY1wiLCBcInVzcl9wd2RcIjogXCIkcGJrZGYyLXNoYTI1NiQyOTAwMCRNVEF4TURFd01UZyRmVUkvNDBaeGo2LjYyR0hnVVg5UGJEWjBTeWJjY3had24zV2wzcUo4VS9NXCIsIFwic2FsdFwiOiBcIjEwMTgxMDEwXCJ9IiwiZXhwIjoxNjQ0NDEzMDI5fQ.oer3oMlf3eN-d6RknIY5Yqjq7SRK4IxXE83ldyrVEw0" http://127.0.0.1:8000/myinfo {"usr_id":"u000010001","usr_acc":"abc","usr_pwd":"$pbkdf2-sha256$29000$MTAxMDEwMTg$fUI/40Zxj6.62GHgUX9PbDZ0SybccxZwn3Wl3qJ8U/M","salt":"10181010"} 以上,我们完成了一个简单的安全性示例,FastAPI提供的安全性框架帮助我们节约了很多代码,但在实际开发中,我们常常使用微服务的方式来开发,对于鉴权最好设计独立的微服务进行处理。后面我们会展示一个采用FastAPI开发的鉴权微服务,以便在此基础上进行业务系统的开发。 |
|
网络协议 最新文章 |
使用Easyswoole 搭建简单的Websoket服务 |
常见的数据通信方式有哪些? |
Openssl 1024bit RSA算法---公私钥获取和处 |
HTTPS协议的密钥交换流程 |
《小白WEB安全入门》03. 漏洞篇 |
HttpRunner4.x 安装与使用 |
2021-07-04 |
手写RPC学习笔记 |
K8S高可用版本部署 |
mySQL计算IP地址范围 |
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 | -2025/1/6 20:53:01- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |