目录
一、engine 与 connection
1.1 engine
1.2 链接(connection)
二、多线程与SQLAlchemy 中的session
2.1 创建 session
2.2 多线程中使用 session
三、多线程共享 session
3.1 线程间共享 session 易报错
3.2 多线程间通过互斥锁互斥访问 session
小结
一、engine 与 connection
1.1 engine
engine 是用来支持 Python 程序与不同类型数据库进行交互的。其在程序结构中的位置通常如下图:
?
目前SQLAlchemy 支持几个主流的数据库 engine:
- PostgreSQL
- MySQL
- Oracle
- Microsoft SQL Server
- SQLite
- Others
使用如下代码即可创建一个 engine 对象
from sqlalchemy import create_engine
engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
在 create_engine 方法中,还支持很多参数,可以通过? help(create_engine) 进行查看,不过本文只介绍三个参数 pool_size、max_overflow 和 pool_timeout:
- pool_size - 链接池中保持数据库连接的数量
- max_overflow - 当链接池中的连接数不够用的时候,允许额外再创建的最大链接数量
- pool_timeout - 排队等数据库链接时的超时时间
pool_size 的默认值是5,max_overflow 的默认值是10,pool_timeout的默认值是30。也就是说,如果这两个值不在调用?create_engine 函数的时候指定,那么默认最多可以创建 5 + 10 = 15个数据库连接。
1.2 链接(connection)
为什么要创建这么多的链接(connection)呢?
因为当程序中有多个线程的时候,链接可能不够用。
当程序中有多个线程的时候,如果每个线程都频繁的请求数据库,那么5个链接可能就不够用了。
因为一个请求数据库的操作,大概需要如下几个步骤:
1、首先需要把SQL请求发送给数据库
2、数据库需要进行SQL的解析和执行
3、然后数据库把数据发送回来
这些步骤都需要时间。如果一个请求需要花费1秒钟,那么2个请求2秒钟,3个请求3秒钟……100个请求100秒。如果线程数量是100,线程池中的 connection 数量是5,那么100个请求需要 100 / 5 = 20 秒才能返回。所以如果这100个线程同时发起查询请求的话,那么必然会有线程需要等待20秒之后才能返回结果。
怎么解决这个问题呢,其中一个解决方案就是创建更多的 connection,你不是100个线程同时发起请求吗?那我就直接创建100个 connection,每个 connection 服务一个线程的查询请求,这样1秒钟内这100个请求就能都返回结果了。
但是,数据库服务端同时响应的请求太多,压力也会很大的,所以一般情况下,数据库服务端都会设置能够接受?connection 数量的上限,PostgreSQL10 中默认的 max_connections 设置就是100。也就是说,默认情况下,最多接受100个 connection ,如果第101个?connection 请求来了,那么就直接拒绝,避免把数据库自己拖垮。
二、多线程与SQLAlchemy 中的session
那么在 SQLAlchemy 中,是如何支持多线程的呢?
2.1 创建 session
在 SQLAlchemy 中,创建 session 的过程如下:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
Session = sessionmaker(bind=engine)
sess = Session()
可以看到,SQLAlchemy 中创建 session 只需要三行代码。
使用 session 的方式也很简单:
例如向数据库中添加一条狗的记录:
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# 创建数据表模型
class TDog(Base):
__tablename__ = "dog" # 数据库表的名字是 dog
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
age = sqlalchemy.Column(sqlalchemy.Integer)
def __init__(self, name, age):
self.name = name
self.age = age
engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
sess = Session()
# 向数据库中添加一行记录
dog = TDog('hello doge', 10)
sess.add(dog)
sess.commit()
然后查询一条狗的记录:
# 从数据库中查询一行记录
Xdog = sess.query(TDog).first()
print(Xdog.id, Xdog.name, Xdog.age)
2.2 多线程中使用 session
在多线程中使用 session 的方式也很简单,直接在每个线程的 run() 方法中创建一个新的?session 就可以了。
import threading
import time
import random
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# 创建数据表模型
class TDog(Base):
__tablename__ = "dog" # 数据库表的名字是 dog
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
age = sqlalchemy.Column(sqlalchemy.Integer)
def __init__(self, name, age):
self.name = name
self.age = age
engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(engine)
sess = Session()
class OpThread(threading.Thread):
def __init__(self, jobname, Session):
threading.Thread.__init__(self, daemon=True)
self.jobname = jobname
self.Session = Session
def add_dog(self, sess):
age = random.randint(10, 20) # 生成一个随机 age,在 10~20 之间
name = 'doge-' + str(age)
dog = TDog(name, age)
sess.add(dog) # 添加一条狗
def run(self):
sess = self.Session() # 创建一个新的 session
while True:
time.sleep(random.randint(1, 20)) # 随机等待几秒钟,在 1~20 之间
self.add_dog(sess)
def main():
sesslist = list()
for i in range(300): # 创建300个线程
t = OpThread(str(i), Session)
t.start()
while True:
time.sleep(100)
main()
如上代码创建了300个线程,可想而知,线程池中默认最大的15个链接根本不够用,如果不够用了,那么执行数据库操作的时候,就会被 engine 阻塞,直到 timeout 到达,如果超时时间到了还没排队到自己,那么直接返回超时错误。
raise exc.TimeoutError( sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: http://sqlalche.me/e/14/3o7r)
三、多线程共享 session
3.1 线程间共享 session 易报错
那么,如果一个程序里面确实创建了很多个线程,并且不想创建太多的 connection ,以免给数据库服务端造成太大的压力,那有什么办法可以做到吗?有没有一种可能,多个线程共享同一个 session 呢?
原则上是不能的,因为 SQLAlchemy 的 Session在设计的时候就是完全不考虑并行的。
如果多个线程共享了同一个session,那么其中一个线程进行数据库操作的时候,必然会影响这个 session 的状态,其它使用这个 session 的线程可能得到错误的数据。
例如A线程在执行 query()?并读取结果数据的时候,B线程执行了add()并执行了 commit,那么,A线程就可能出错:
例如,多线程共享同一个 session 报错的代码演示:
import json
import time
import models
import dbopt
import threading
import uuid
import random
import datetime
from sqlalchemy import func
class OpThread(threading.Thread):
def __init__(self, jobname, dbsess, op):
threading.Thread.__init__(self, daemon=True)
self.jobname = jobname
self.dbsess = dbsess
self.operate = op
def log(self, msg):
print("{0} - {1}".format(self.jobname, msg))
def run(self):
while True:
time.sleep(1)
self.log("job running [{0}]".format(self.jobname))
if 'w' == self.operate:
self.add_data()
if 'r' == self.operate:
self.query_data()
def add_data(self):
try:
uid = str(uuid.uuid4())
name = self.jobname + '-' + uid
self.log("add dog which name is : [{0}]".format(name))
self.dbsess.add(models.TDog(name, uid))
self.dbsess.commit()
except Exception as e:
self.log("in add - {0}".format(str(e)))
self.dbsess.rollback()
def query_data(self):
try:
dog = self.dbsess.query(models.TDog).order_by(func.random()).offset(0).limit(1).first()
time.sleep(random.randint(1, 3))
self.log("modify dog which name is : [{0}]".format(dog.name))
self.log('in read - {0}'.format(dog.data))
data = json.loads(dog.data)
except Exception as e:
self.log("in modify - {0}".format(str(e)))
self.dbsess.rollback()
def main():
sess = dbopt.Sess()
a = OpThread('write', sess, 'r')
a.start()
for i in range(10):
b = OpThread('read', sess, 'w')
b.start()
while True:
time.sleep(100)
if "__main__" == __name__:
main()
上述代码在运行过程中,会出现如下错误:
sqlalchemy/engine/base.py:2362: SAWarning: transaction already deassociated from connection
sqlalchemy/orm/session.py:3229: SAWarning: Usage of the 'Session.add()' operation is not currently supported within the execution stage of the flush process. Results may not be consistent. ?Consider using alternative event listeners or connection-level operations instead.
sqlalchemy/orm/session.py:878: SAWarning: Session's state has been changed on a non-active transaction - this state will be discarded.
所以多线程间共享 SQLAlchemy 的 session 是无法正常使用的。
3.2 多线程间通过互斥锁互斥访问 session
那么在线程之间,使用互斥锁,让每个线程在访问 session 的时候,不受其它线程的干扰,可以正常使用吗?
下面是在 session 执行的关键部位加互斥锁的代码:
def add_data(self):
try:
uid = str(uuid.uuid4())
name = self.jobname + '-' + uid
self.log("add dog which name is : [{0}]".format(name))
mutex.acquire() # 加锁
self.dbsess.add(models.TDog(name, uid))
self.dbsess.commit()
mutex.release() # 释放锁
except Exception as e:
self.log("in add - {0}".format(str(e)))
self.dbsess.rollback()
def query_data(self):
try:
mutex.acquire() # 加锁
dog = self.dbsess.query(models.TDog).order_by(func.random()).offset(0).limit(1).first()
mutex.release() # 释放锁
time.sleep(random.randint(1, 3))
self.log("query dog which name is : [{0}]".format(dog.name))
self.log('in read - {0}'.format(dog.data))
data = json.loads(dog.data)
except Exception as e:
self.log("in read - {0}".format(str(e)))
self.dbsess.rollback()
执行过程中,可能报错如下:
This session is in 'prepared' state; no further SQL can be emitted within this transaction.
sqlalchemy/engine/base.py:2362: SAWarning: transaction already deassociated from connection
原因是,通过?query() 获取对象后,在其它线程使用这个 session 进行操作之前,你没有及时把数据读完了,那么这些数据就被删除了。也就是上面?query_data() 方法中的情况。
那么把互斥锁一直握着不释放,一直到把数据读取完成后,再释放锁呢?这个是没有问题的,但是这就导致了所有涉及到 session 操作的函数,都需要占用互斥锁很长时间,大概率会阻塞其它线程的执行,所以这样的方式虽然可以防止 SQLAlchemy 出错,但是却会导致效率下降,丧失多线程的优势。
小结
如果使用多线程,那么需要注意如下事项:
- 每个线程中使用 Session() 创建新的 session - 优势:线程间不会互相干扰,劣势:线程多了会导致创建很多?connection,数据库服务端压力增大
- ?线程间共享 session - 优势:节省 connection,劣势:线程间互相干扰,数据出错。
- 线程间共享 session 并且使用互斥锁?- 优势:节省 connection ,线程间不会互相干扰;劣势:阻塞其它线程,效率下降
?
|