前情提要
- 数据库信息可以在config.ini文件配置并读取
- 通过pymysql对数据库进行连接(具体方法具体封装),代码如下(不废话)
import pymysql
from tools import read_config
# ================封装MySQL基本操作=================
class ReadDB:
def __init__(self):
print("Mysql数据库连接中...")
config = read_config.ReadConfig()
env = config.get_data("Env", "env")
self.host = config.get_data(env, "mysql_host")
self.port = int(config.get_data(env, "mysql_port"))
self.user = config.get_data(env, "mysql_user")
self.password = config.get_data(env, "mysql_password")
self.db = config.get_data(env, "mysql_db")
# 连接数据# port = port # cursorclass=cursors.DictCursor
try:
self.db_conn = pymysql.connect(host = self.host, port = self.port, user = self.user,
password = self.password, db = self.db, charset="utf8")
print("Mysql数据库连接成功")
except pymysql.err.OperationalError as e:
print("Mysql Error %d:%s" % (e.args[0], e.args[1]))
# 查找表数据cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")
def select(self, select_sql):
try:
print("执行sql语句:", select_sql)
with self.db_conn.cursor() as cursor:
cursor.execute(select_sql)
print(cursor.execute(select_sql))
self.db_conn.commit()
print("成功查找:", cursor.rowcount, "条数据")
cursor.close()
except Exception as e:
print("查找数据失败:", e.args)
def select_where(self, table_name, filed_name, data):
try:
if filed_name != None:
select_sql = "select %s from " + table_name + " where " + filed_name + " = '%s';"
else:
select_sql = "select %s from " + table_name + ";"
print("执行sql语句:", select_sql % data)
with self.db_conn.cursor() as cursor:
cursor.execute(select_sql % data)
self.db_conn.commit()
print("成功查找:", cursor.rowcount, "条数据")
cursor.close()
except Exception as e:
print("查找数据失败:", e.args)
# 删除表数据
def delete_where(self, table_name, field_name, data):
try:
delete_sql = "delete from " + table_name + " where " + field_name + " = '%s';"
print("执行sql语句:", delete_sql % data)
with self.db_conn.cursor() as cursor:
cursor.execute(delete_sql % data)
print(cursor.execute(delete_sql % data))
self.db_conn.commit()
print("成功删除:", cursor.rowcount, "条数据")
cursor.close()
except Exception as e:
print("删除数据失败:", e.args)
# 插入表数据
def insert(self, table_name, table_data):
try:
for key in table_data:
table_data[key] = "'" + str(table_data[key]) + "'"
key = ','.join(table_data.keys())
value = ",".join(table_data.values())
insert_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")" + ";"
print("执行sql语句:", insert_sql)
with self.db_conn.cursor() as cursor:
cursor.execute(insert_sql)
self.db_conn.commit()
print("成功插入:", cursor.rowcount, "条数据")
cursor.close()
except Exception as e:
print("插入数据失败:", e.args)
# 更新数据库数据
def update(self, update_sql):
try:
print("执行sql语句:", update_sql)
with self.db_conn.cursor() as cursor:
cursor.execute(update_sql)
print(cursor.execute(update_sql))
self.db_conn.commit()
print("成功更新:", cursor.rowcount, "条数据")
cursor.close()
except Exception as e:
print("更新数据失败:", e.args)
def update_where(self, table_name, filed1_name, field2_name, data):
try:
update_sql = "UPDATE " + table_name + " SET " + filed1_name + " = '%s' where " + field2_name + " = '%s';"
print("执行sql语句:", update_sql % data)
with self.db_conn.cursor() as cursor:
cursor.execute(update_sql % data)
print(cursor.execute(update_sql % data))
self.db_conn.commit()
print("成功更新:", cursor.rowcount, "条数据")
cursor.close()
except Exception as e:
print("更新数据失败:", e.args)
def get_info(self):
print(self.db_conn.get_autocommit(),
self.db_conn.get_host_info(),
self.db_conn.get_server_info(),
self.db_conn.get_proto_info())
# 关闭数据库连接
def close(self):
self.db_conn.close()
print("Mysql关闭成功")
if __name__ == '__main__':
Test = ReadDB()
sql = 'SELECT * FROM {} WHERE xxx= {}'.format("t_account", xxx)
|