一、登陆后生成token+存储到sqlite
"""
"""
import time
import base64
import hmac
from flask import make_response, request
from web.conf.sql_conf import run_sql
from web.tests.web_user_login.ht_user_login import get_tw_ht_login
__loginTime = 86400
def login():
ht_userName = request.form.get("username")
ht_password = request.form.get("password")
res = get_tw_ht_login(ht_userName, ht_password).json()
token = __generate_token(res['data']['adminName'], __loginTime)
__save_token(ht_userName=res['data']['adminName'], tokens=token)
res['data']['token'] = token
print("登陆信息", res)
return make_response(res)
def __save_token(ht_userName, tokens):
"""
token存储到sqlite
@Args:
ht_userName: str
tokens: str
:param ht_userName: 用户名称
:param tokens: token
:return:
"""
select_username = run_sql.select_sql("select * from login where username='%s'" % ht_userName)
times = int(time.time())
if select_username is None:
sql = "insert into login('token','times','username','starts') values (?,?,?,?)"
values_data = (tokens, times, ht_userName, 1)
run_sql.insert_sql(sql, values_data)
else:
sql = "update login set token=?, times=? where username=?"
values_data = (tokens, times, ht_userName)
run_sql.upDate_sql(sql, values_data)
def __generate_token(key, expire=3600):
"""
@Args:
key: str (用户给定的key,需要用户保存以便之后验证token,每次产生token时的key 都可以是同一个key)
expire: int(最大有效时间,单位为s)(self.__loginTime)
@Return:
state: str
:param key:
:param expire:
:return:
"""
ts_str = str(time.time() + expire)
ts_byte = ts_str.encode("utf-8")
sha1_tshex_str = hmac.new(key.encode("utf-8"), ts_byte, 'sha1').hexdigest()
token = ts_str + ':' + sha1_tshex_str
b64_token = base64.urlsafe_b64encode(token.encode("utf-8"))
return b64_token.decode("utf-8")
二、从web获取请求头内token,并且解析token且校验
(False失败、True通过)
"""
自行生成一套token校验
"""
import time
import base64
import hmac
from flask import *
__fail_res = {"code": 401, "msg": "401 token失效,请重新登陆即可", "data": {}}
def checkToken(res):
def wrapper():
try:
print(request.path)
if request.path == '/login':
return res()
cookiesDict = __get_header_cookies()
print("cookiesDict", cookiesDict)
if cookiesDict['token']:
print("==token校验:", __certify_token(cookiesDict['user_name'], cookiesDict['token']))
if __certify_token(cookiesDict['user_name'], cookiesDict['token']):
return res()
return __fail_res
except Exception as error:
print(error)
return __fail_res
wrapper.__name__ = res.__name__
return wrapper
def __get_header_cookies():
"""
获取vue-header-cookies
:return:
"""
Cookies = request.headers.get("Cookies")
if Cookies is not None:
cookiesDict = {}
for cookieSplit in Cookies.split(';'):
cookiesDict[cookieSplit.split("=", 1)[0]] = cookieSplit.split("=", 1)[1]
return cookiesDict
return Cookies
def __certify_token(key, token):
"""
解析token
@Args:
key: str
token: str
@Returns:
boolean
:param key:
:param token:
:return:
"""
token_str = base64.urlsafe_b64decode(token).decode('utf-8')
token_list = token_str.split(':')
if len(token_list) != 2:
return False
ts_str = token_list[0]
if float(ts_str) < time.time():
return False
known_sha1_tsstr = token_list[1]
sha1 = hmac.new(key.encode("utf-8"), ts_str.encode('utf-8'), 'sha1')
calc_sha1_tsstr = sha1.hexdigest()
if calc_sha1_tsstr != known_sha1_tsstr:
return False
return True
三、装饰器进行拦截校验(@checkToken)
from web import app
from web.tests.web_user_login import ks_user_login
from web.tests.get_my_token import checkToken
@app.route('/login', methods=["POST"])
@checkToken
def login():
"""用户名密码登陆"""
login_data = ks_user_login.login()
return login_data
|