mysql连接
1. 不使用连接池
我们知道使用pymysql连接数据库一般需要下面的步骤:
-
- 创建连接对象(一般使用connect或者是连接池)
-
- 创建游标对象
-
- 使用游标执行代码
-
- 使用游标获取结果并返回 # 返回的是元组(每个实例一个元组)!
-
- 关闭游标和连接
conn = pymysql.connect(host, port, ...)
cursor = conn.cursor()
sql = "select * from goods;"
cursor.execute(sql)
res = cursor.fetchall()
print(res)
cursor.close()
conn.close()
2. 使用连接池
2.1 为什么需要连接池
连接池是connection对象的缓冲区,他里面会存放一些connection,当程序需要使用connection时,如果连接池中有,则直接从连接池获取,不需要再重新创建connection。连接池让程序能够复用连接。资源复用. 可以尽量避免连接三次握手四次挥手等非业务流程带来的损耗。池化技术,本质上说就是资源复用。
2.2 连接池管理
1、连接池维护着两个容器空闲池和活动池
2、空闲池用于存放未使用的连接,活动池存放正在使用的连接,活动池中的连接使用完之后要归还回空闲池
3、当需要连接时,先判断空闲池是否有连接,如果有则取出一个放置到活动池供程序使用
4、如果没有,则判断活动池中连接是否达到最大连接数,如果没有,则创建一个连接放到活动池供程序使用。
5、如果空闲池中没有连接,活动池中连接也达到上限,则不能创建新连接,此时会判断是否等待超时,如果没有等待超时则需等待活动池中的连接归还回空闲池
6、如果等待超时,可以采取多种处理方式,例如:直接抛出异常,或将活动池中使用最久的连接移除掉归还回空闲池以供程序使用
2.3 连接池数量确定
参考
首先解决一个问题,为什么需要多个连接?对于数据库而言,每个连接都是独占的,如果一个业务占用连接,而又进入了IO等待,那么这个业务就会一直占用这个连接,其他业务也只能等待。如果开多条连接,不同的业务就可以利用不同的连接通道来处理。 一般情况下,线程池线程数量尽量与连接池中的连接数量一致。这样做可以避免不同线程之间抢资源或是资源过多线程处理不过来导致浪费的情况。注意这里说的一般情况,是业务服务器与数据库服务器性能差不多的情况。其实,数据库服务器也是使用多线程来处理业务,如果连接数过多,数据库会处理不过来。 另外,如果不使用连接池,就需要不同的线程去绑定不同的连接,使得整个系统的耦合度变高。还要注意一点的是,在一个数据库服务器中可能有多个连接池。比如,一个连接池对应写业务,与一个只处理写业务的线程池,共同处理写数据库,而再用另一套连接池只处理读业务,与只读数据库连接,这样做就实现了读写分离。
2. 步骤
-
- 获取锁
-
- 获取池子(可以定义最大连接数,常规的mysql连接参数等)
-
- 使用池子连接
-
- 定义游标
-
- 使用游标执行代码
-
- 使用游标获取结果并返回
-
- 关闭连接
import threading
from typing import List
import pymysql
from dbutils.pooled_db import PooledDB
from MovieRecSystem.config import redis_config, mysql_config
from MovieRecSystem.utils.logger_util import logger
def _encode(v, encoding='UTF-8') -> str:
return str(v, encoding=encoding)
'''
单例是redis, 然后提供静态方法
可以使用其它的
'''
class DB(object):
__lock = threading.Lock()
__pool = None
def __init__(self):
self.pool = DB.__get_conn_pool()
@staticmethod
def __get_conn_pool():
'''
就是获取一个连接池(因为是单例), 获取锁之后再添加实例(和redis一样)
但是不代表已经连接成功了,还需要connection
:return:
'''
if DB.__pool is None:
DB.__lock.acquire()
try:
if DB.__pool is None:
_cfg = mysql_config.cfg.copy()
if 'creator' not in _cfg:
raise ValueError("必须给定creator参数")
if _cfg['creator'] != 'pymysql':
raise ValueError("creator参数必须给定为pymysql")
del _cfg['creator']
DB.__pool = PooledDB(pymysql, **_cfg)
except Exception as e:
raise ValueError("创建Mysql数据库连接池异常!") from e
finally:
DB.__lock.release()
return DB.__pool
def _get_connection(self):
conn = self.pool.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn, cursor
@staticmethod
def _close_connection(conn, cursor):
if cursor:
try:
cursor.close()
except Exception as e:
logger.error("关闭Mysql连接cursor异常。", exc_info=e)
if conn:
try:
conn.close()
except Exception as e:
logger.error("关闭Mysql连接conn异常。", exc_info=e)
def query_sql(self, sql, **params):
conn, cursor = self._get_connection()
try:
cursor.execute(sql, params)
result = cursor.fetchall()
except Exception as e:
raise ValueError(f"Query查询异常,当前sql语句为:{sql}, 参数类别为:{params}.") from e
finally:
self._close_connection(conn, cursor)
return result
def execute_sql(self, sql, **params):
conn, cursor = self._get_connection()
try:
cursor.execute(sql, params)
result = cursor.lastrowid
conn.commit()
except Exception as e:
conn.rollback()
raise ValueError(f"Execute执行异常,当前sql语句为:{sql}, 参数类别为:{params}.") from e
finally:
self._close_connection(conn, cursor)
return result
|