mysql 操作封装
1.同步操作mysql
import pymysql
from sqlalchemy import create_engine
import pandas as pd
class DataBaseHandle(object):
''' MySQL 操作类'''
def __init__(self, host, username, password, database, port):
'''初始化数据库信息并创建数据库连接'''
self.db_conn = pymysql.connect(
host=host,
user=username,
password=password,
db=database,
port=port,
charset="utf8"
)
self.cursor = self.db_conn.cursor()
self.engine = create_engine(
'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}'.format(
user=username,
passwd=password,
host=host,
port=port,
db=database
))
def select_for_df(self, sql):
''' 查询并返回数据帧'''
try:
return pd.read_sql(sql, con=self.db_conn)
except Exception as e:
print('problem:',e)
return None
def append_for_df(self,df,table_name,if_exists='append'):
''' 刷新数据-df插入 '''
df.to_sql(name=table_name, con=self.engine, if_exists=if_exists, index=False)
def not_select_ifo(self, sql):
''' 非查询 '''
try:
result = self.cursor.execute(sql)
self.db_conn.commit()
return result
except Exception as e:
self.db_conn.rollback()
raise e
def close(self):
''' 关闭连接 '''
self.cursor.close()
self.db_conn.close()
2.异步操作mysql
import pandas as pd
import asyncio
import aiomysql
nest_asyncio.apply()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class AioMysql:
def __init__(self):
self.coon = None
self.pool = None
async def initpool(self,db,minsize=5,maxsize=10):
try:
__pool = await aiomysql.create_pool(
minsize=minsize,
maxsize=maxsize,
host='localhost',
port=3306,
user='db',
password='***',
db=db,
autocommit=True,
)
return __pool
except:
print('connect error')
async def getCurosr(self):
conn = await self.pool.acquire()
cur = await conn.cursor(aiomysql.DictCursor)
return conn, cur
async def select_for_df(self, query, param=None):
"""
查询操作
:param query: sql语句
:param param: 参数
:return:
"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
data = await cur.fetchall()
return pd.DataFrame(data)
except Exception as e:
print('---------------->query error')
raise e
finally:
if cur:
await cur.close()
await self.pool.release(conn)
async def not_select_ifo(self, query, param=None):
"""
增删改 操作
:param query: sql语句
:param param: 参数
:return:
"""
conn, cur = await self.getCurosr()
try:
await cur.execute(query, param)
if cur.rowcount == 0:
return False
else:
return True
except:
await conn.rollback()
finally:
if cur:
await cur.close()
await self.pool.release(conn)
class PoolOjb(object):
''' 获取连接'''
@staticmethod
def connect():
async def getAmysqlobj(db):
mysqlobj = AioMysql()
pool = await mysqlobj.initpool(db)
mysqlobj.pool = pool
return mysqlobj
return getAmysqlobj
class ClassCountTable(object):
sql = "select * from table1 where to_days(now())>=to_days(start_time) and to_days(now())<= to_days(end_time);"
def __init__(self):
self.db = 'db_name'
self.async_conn = PoolOjb.connect()
self.df = self.get_data()
def get_data(self):
async def get_df():
mysqlobj = await self.async_conn(self.db)
df = await mysqlobj.select_for_df(self.sql)
return df
df = loop.run_until_complete(get_df())
return df
|