环境:python3.6+
模块:flask、jwt
目的:实现用于登录并返回token令牌,用于后续的接口权限验证。
前言介绍:
jwt(JSON Web Tokens),在用户认证当中常用的方式,在如今的前后端分离项目当中应用广泛,提高了后端代码的简洁和效能。
传统token和jwt区别
-
传统token:服务端会对登录成功的用户生成一个随机token返回,同时也会在本地保留对应的token(如在数据库中存入:token、用户名、过期时间等),当用户再次访问时,会携带之前的token给服务端进行校验,服务端则通过与本地保留的token进行对比,若寻找到符合条件的token数据,则校验成功 -
jwt验证:服务端会对登录成功的用户生成一个随机token返回,但并不会在服务端本地保留(这是jwt和传统token最大的区别),而当用户再次访问时,服务端会基于jwt对token进行校验和解码(由于jwt是基于base64url编码,因此是可以反向解码的,建议不要在token中存放敏感数据)
JWT 的官方文档:?JSON Web Token Introduction - jwt.io
代码如下:
from flask import g, request, Flask, current_app, jsonify
import jwt
from jwt import exceptions
import functools
import datetime
app = Flask(__name__)
# 处理中文编码
app.config['JSON_AS_ASCII'] = False
# 跨域支持
def after_request(resp):
resp.headers['Access-Control-Allow-Origin'] = '*'
return resp
app.after_request(after_request)
# 构造header
headers = {
'typ': 'jwt',
'alg': 'HS256'
}
# 密钥
SALT = 'iv%i6xo7l8_t9bf_u!8#g#m*)*+ej@bek6)(@u3kh*42+unjv='
def create_token(username, password):
# 构造payload
payload = {
'username': username,
'password': password, # 自定义用户ID
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7) # 超时时间
}
result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers).decode('utf-8')
return result
def verify_jwt(token, secret=None):
"""
检验jwt
:param token: jwt
:param secret: 密钥
:return: dict: payload
"""
if not secret:
secret = current_app.config['JWT_SECRET']
try:
payload = jwt.decode(token, secret, algorithms=['HS256'])
return payload
except exceptions.ExpiredSignatureError: # 'token已失效'
return 1
except jwt.DecodeError: # 'token认证失败'
return 2
except jwt.InvalidTokenError: # '非法的token'
return 3
def login_required(f):
'让装饰器装饰的函数属性不会变 -- name属性'
'第1种方法,使用functools模块的wraps装饰内部函数'
@functools.wraps(f)
def wrapper(*args, **kwargs):
try:
if g.username == 1:
return {'code': 4001, 'message': 'token已失效'}, 401
elif g.username == 2:
return {'code': 4001, 'message': 'token认证失败'}, 401
elif g.username == 2:
return {'code': 4001, 'message': '非法的token'}, 401
else:
return f(*args, **kwargs)
except BaseException as e:
return {'code': 4001, 'message': '请先登录认证.'}, 401
'第2种方法,在返回内部函数之前,先修改wrapper的name属性'
# wrapper.__name__ = f.__name__
return wrapper
@app.before_request
def jwt_authentication():
"""
1.获取请求头Authorization中的token
2.判断是否以 Bearer开头
3.使用jwt模块进行校验
4.判断校验结果,成功就提取token中的载荷信息,赋值给g对象保存
"""
auth = request.headers.get('Authorization')
if auth and auth.startswith('Bearer '):
"提取token 0-6 被Bearer和空格占用 取下标7以后的所有字符"
token = auth[7:]
"校验token"
g.username = None
try:
"判断token的校验结果"
payload = jwt.decode(token, SALT, algorithms=['HS256'])
"获取载荷中的信息赋值给g对象"
g.username = payload.get('username')
except exceptions.ExpiredSignatureError: # 'token已失效'
g.username = 1
except jwt.DecodeError: # 'token认证失败'
g.username = 2
except jwt.InvalidTokenError: # '非法的token'
g.username = 3
@app.route('/')
def hello_world():
return "ok"
# 登录
@app.route('/api/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
data = request.form
type_ = data.get("type")
if type_ == 'login':
username = data.get("username")
password = data.get("password")
# 验证账号密码,正确则返回token,用于后续接口权限验证
if username == "root" and password == "123456":
token = create_token(username, password)
return {"code": 200, "message": "success", "data": {"token": token}}
else:
return {"code": 501, "message": "登陆失败"}
else:
return {"code": 201, "message": "type is false"}
elif request.method == 'GET':
return {"code": 202, "message": "get is nothing"}
else:
return {"code": 203, "message": "'not support other method'"}
# 测试接口
@app.route('/api/test', methods=['GET', 'POST'])
@login_required
def submit_test_info_():
username = g.username
return username
if __name__ == "__main__":
app.run(host='0.0.0.0', port=8090, debug=True)
运行项目后,首先测下登录接口
效果如下:
?
这个时候返回token令牌。
我们先测下没有令牌的情况下访问/api/test测试接口,效果如下:
?
这个时候是无法访问接口的,因为身份信息验证并为通过,
接下来,我们把token令牌添加到请求头中,添加参数:Authorization
效果如下:
?
这个时候可以看出,只要请求头中携带了正确的token口令,才可以访问这个接口。
|