1 数据库模块my_sql.py
from sqlalchemy import create_engine,text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, VARCHAR, DateTime
Base = declarative_base()
db=create_engine("mysql+pymysql://root:root@127.0.0.1/my_sql?charset=utf8", max_overflow=5)
Session = sessionmaker(bind=db)
sessionsql = Session()
class Users(Base):
__tablename__ = 'admin_info'
id = Column(Integer, primary_key=True)
name = Column(String(32),unique=True)
password = Column(String(32))
type=Column(Integer,default="1")
email=Column(VARCHAR(255))
enable=Column(Integer,default='1')
create_time=Column(DateTime,default="2021-10-1 10:10:10")
__table_args__ = {
"mysql_charset": "utf8"
}
def drop_db():
Base.metadata.drop_all(db)
def init_db():
Base.metadata.create_all(db)
obj = Users(name="admin", password='admin',type=1,email="admin@qq.com",enable=1,create_time='2021-11-11 13:13:13')
sessionsql.add(obj)
sessionsql.commit()
sessionsql.close()
if __name__ == '__main__':
init_db()
登录界面+账号判定+退出登录
from flask_login import LoginManager
from sqlalchemy import text,func
from my_sql import sessionsql,Users
from flask_login import login_user,current_user,login_required,logout_user,UserMixin
app.config['SECRET_KEY'] = 'aixieshaxieshayuefuzasdaawayuehao%$'
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
class User(UserMixin):
pass
@login_manager.user_loader
def user_loader(user_name):
try:
users=sessionsql.query(Users.name).all()
except:
sessionsql.rollback()
finally:
sessionsql.close()
if (user_name,) not in users:
return
user = User()
user.id = user_name
return user
@login_manager.request_loader
def request_loader(request):
user_name = request.form.get('uname')
try:
users = sessionsql.query(Users.name).all()
except:
sessionsql.rollback()
finally:
sessionsql.close()
if user_name not in users:
return
user = User()
user.id = user_name
try:
admin_pass = sessionsql.query(Users.password).from_statement(
text("SELECT * FROM admin_info where name=:name")).params(name=user_name).all()
(pass_word,) = admin_pass
except:
sessionsql.rollback()
admin_pass=""
finally:
sessionsql.close()
user.is_authenticated = pass_word == admin_pass
return user
@app.route('/login', methods=('GET', 'POST'))
def login():
user_name = request.form.get('uname')
upwd = request.form.get('upwd')
if request.method == 'GET':
return render_template("login.html")
else:
try:
users=sessionsql.query(Users.name).all()
except:
sessionsql.rollback()
finally:
sessionsql.close()
if (user_name,) in users:
try:
admin_pass = sessionsql.query(Users.password).from_statement(
text("SELECT * FROM admin_info where name=:name")).params(name=user_name).all()
except:
sessionsql.rollback()
return render_template('login.html', msg='密码错误,请重试!!!')
finally:
sessionsql.close()
if (upwd,) in admin_pass:
user = User()
user.id = user_name
login_user(user)
return render_template('home.html',user=current_user.get_id())
else:
msg="密码错误"
else:
msg="账号不存在"
return render_template('login.html',msg=msg)
@app.route('/home',methods=('GET', 'POST'))
@login_required
def home():
return render_template('home.html',user=current_user.get_id())
@app.route('/sign_out',methods=['get'])
def sign_out():
logout_user()
return redirect(url_for('login'))
自己写的,有些地方还可以优化,可以评论交流,转载说明出处,谢谢
|