IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> mysql 操作封装 -> 正文阅读

[Python知识库]mysql 操作封装

mysql 操作封装

1.同步操作mysql

import pymysql
from sqlalchemy import create_engine
import pandas as pd
class DataBaseHandle(object):

    ''' MySQL 操作类'''

    def __init__(self, host, username, password, database, port):
        '''初始化数据库信息并创建数据库连接'''
        # pymysql
        self.db_conn = pymysql.connect(
            host=host,
            user=username,
            password=password,
            db=database,
            port=port,
            charset="utf8"
        )
        self.cursor = self.db_conn.cursor()

        # sqlalchemy
        self.engine = create_engine(
            'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}'.format(
                user=username,
                passwd=password,
                host=host,
                port=port,
                db=database
            ))

    def select_for_df(self, sql):
        ''' 查询并返回数据帧'''
        try:
            # 将数据转化成数据帧
            return pd.read_sql(sql, con=self.db_conn)
        except Exception as e:
            # print('fetch the data fail')
            print('problem:',e)
            return None

    def append_for_df(self,df,table_name,if_exists='append'):
        ''' 刷新数据-df插入 '''
        # 默认是替换  replace替换、append追加,fail则当表存在时提示ValueError
        df.to_sql(name=table_name, con=self.engine, if_exists=if_exists, index=False)

    def not_select_ifo(self, sql):
        ''' 非查询 '''
        try:
            result = self.cursor.execute(sql)
            self.db_conn.commit()
            return result
        except Exception as e:
            self.db_conn.rollback()
            raise e

    def close(self):
        ''' 关闭连接 '''
        self.cursor.close()
        self.db_conn.close()

2.异步操作mysql

import pandas as pd
import asyncio
import aiomysql
nest_asyncio.apply()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
class AioMysql:

    def __init__(self):
        self.coon = None
        self.pool = None

    async def initpool(self,db,minsize=5,maxsize=10):
        try:
            __pool = await aiomysql.create_pool(
                minsize=minsize,  # 连接池最小值
                maxsize=maxsize,  # 连接池最大值
                host='localhost',
                port=3306,
                user='db',
                password='***',
                db=db,
                autocommit=True,  # 自动提交模式
            )
            return __pool
        except:
            print('connect error')

    async def getCurosr(self):
        conn = await self.pool.acquire()
        # 返回字典格式
        cur = await conn.cursor(aiomysql.DictCursor)
        return conn, cur

    async def select_for_df(self, query, param=None):
        """
        查询操作
        :param query: sql语句
        :param param: 参数
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            #  接收全部的返回结果行,将数据转化成数据帧
            data = await cur.fetchall()
            return pd.DataFrame(data)
        except Exception as e:
            print('---------------->query error')
            raise e
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)

    async def not_select_ifo(self, query, param=None):
        """
        增删改 操作
        :param query: sql语句
        :param param: 参数
        :return:
        """
        conn, cur = await self.getCurosr()
        try:
            await cur.execute(query, param)
            # 这是一个只读属性,并返回执行execute()方法后影响的行数。
            if cur.rowcount == 0:
                return False
            else:
                return True
        except:
            await conn.rollback()
        finally:
            if cur:
                await cur.close()
            # 释放掉conn,将连接放回到连接池中
            await self.pool.release(conn)


class PoolOjb(object):
    ''' 获取连接'''
    @staticmethod
    def connect():
        async def getAmysqlobj(db):
            mysqlobj = AioMysql()
            pool = await mysqlobj.initpool(db)
            mysqlobj.pool = pool
            return mysqlobj
        return getAmysqlobj


#################################### 在其他类中调用
class ClassCountTable(object):
	 sql = "select * from table1 where to_days(now())>=to_days(start_time) and to_days(now())<= to_days(end_time);"
     def __init__(self):
        # 创建数据库连接
        self.db = 'db_name'
        # 获取连接对象
        self.async_conn = PoolOjb.connect()
        self.df = self.get_data()
        
  def get_data(self):
        async def get_df():
            # 根据连接对象获取连接池
            mysqlobj = await self.async_conn(self.db)
            df = await mysqlobj.select_for_df(self.sql)  
            return df
        df = loop.run_until_complete(get_df())
        return df
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-04-24 09:24:12  更:2022-04-24 09:24:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 11:04:12-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计