IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Python: SQLAlchemy、engine、session 与多线程 -> 正文阅读

[大数据]Python: SQLAlchemy、engine、session 与多线程

目录

一、engine 与 connection

1.1 engine

1.2 链接(connection)

二、多线程与SQLAlchemy 中的session

2.1 创建 session

2.2 多线程中使用 session

三、多线程共享 session

3.1 线程间共享 session 易报错

3.2 多线程间通过互斥锁互斥访问 session

小结


一、engine 与 connection

1.1 engine

engine 是用来支持 Python 程序与不同类型数据库进行交互的。其在程序结构中的位置通常如下图:

../_images/sqla_engine_arch.png

?

目前SQLAlchemy 支持几个主流的数据库 engine:

  • PostgreSQL
  • MySQL
  • Oracle
  • Microsoft SQL Server
  • SQLite
  • Others

使用如下代码即可创建一个 engine 对象

from sqlalchemy import create_engine
engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')

在 create_engine 方法中,还支持很多参数,可以通过? help(create_engine) 进行查看,不过本文只介绍三个参数 pool_size、max_overflow 和 pool_timeout:

  • pool_size - 链接池中保持数据库连接的数量
  • max_overflow - 当链接池中的连接数不够用的时候,允许额外再创建的最大链接数量
  • pool_timeout - 排队等数据库链接时的超时时间

pool_size 的默认值是5,max_overflow 的默认值是10,pool_timeout的默认值是30。也就是说,如果这两个值不在调用?create_engine 函数的时候指定,那么默认最多可以创建 5 + 10 = 15个数据库连接。

1.2 链接(connection)

为什么要创建这么多的链接(connection)呢?

因为当程序中有多个线程的时候,链接可能不够用。

当程序中有多个线程的时候,如果每个线程都频繁的请求数据库,那么5个链接可能就不够用了。

因为一个请求数据库的操作,大概需要如下几个步骤:

1、首先需要把SQL请求发送给数据库

2、数据库需要进行SQL的解析和执行

3、然后数据库把数据发送回来

这些步骤都需要时间。如果一个请求需要花费1秒钟,那么2个请求2秒钟,3个请求3秒钟……100个请求100秒。如果线程数量是100,线程池中的 connection 数量是5,那么100个请求需要 100 / 5 = 20 秒才能返回。所以如果这100个线程同时发起查询请求的话,那么必然会有线程需要等待20秒之后才能返回结果。

怎么解决这个问题呢,其中一个解决方案就是创建更多的 connection,你不是100个线程同时发起请求吗?那我就直接创建100个 connection,每个 connection 服务一个线程的查询请求,这样1秒钟内这100个请求就能都返回结果了。

但是,数据库服务端同时响应的请求太多,压力也会很大的,所以一般情况下,数据库服务端都会设置能够接受?connection 数量的上限,PostgreSQL10 中默认的 max_connections 设置就是100。也就是说,默认情况下,最多接受100个 connection ,如果第101个?connection 请求来了,那么就直接拒绝,避免把数据库自己拖垮。

二、多线程与SQLAlchemy 中的session

那么在 SQLAlchemy 中,是如何支持多线程的呢?

2.1 创建 session

在 SQLAlchemy 中,创建 session 的过程如下:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
Session = sessionmaker(bind=engine)

sess = Session()

可以看到,SQLAlchemy 中创建 session 只需要三行代码。

使用 session 的方式也很简单:

例如向数据库中添加一条狗的记录:

import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# 创建数据表模型
class TDog(Base):
    __tablename__ = "dog" # 数据库表的名字是 dog

    id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
    name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
    age = sqlalchemy.Column(sqlalchemy.Integer)

    def __init__(self, name, age):
        self.name = name
        self.age = age


engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)

sess = Session()

# 向数据库中添加一行记录
dog = TDog('hello doge', 10)
sess.add(dog)
sess.commit()

然后查询一条狗的记录:

# 从数据库中查询一行记录
Xdog = sess.query(TDog).first()

print(Xdog.id, Xdog.name, Xdog.age)

2.2 多线程中使用 session

在多线程中使用 session 的方式也很简单,直接在每个线程的 run() 方法中创建一个新的?session 就可以了。

import threading
import time
import random
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# 创建数据表模型
class TDog(Base):
    __tablename__ = "dog" # 数据库表的名字是 dog

    id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True, autoincrement=True)
    name = sqlalchemy.Column(sqlalchemy.String(256), nullable=True)
    age = sqlalchemy.Column(sqlalchemy.Integer)

    def __init__(self, name, age):
        self.name = name
        self.age = age

engine = create_engine('postgresql://dbuser:dbpassword@dbhost:5432/dbname')
Session = sessionmaker(bind=engine)
Base.metadata.create_all(engine)

sess = Session()

class OpThread(threading.Thread):
    def __init__(self, jobname, Session):
        threading.Thread.__init__(self, daemon=True)
        self.jobname = jobname
        self.Session = Session

    def add_dog(self, sess):
        age = random.randint(10, 20) # 生成一个随机 age,在 10~20 之间
        name = 'doge-' + str(age)
        dog = TDog(name, age)
        sess.add(dog) # 添加一条狗

    def run(self):
        sess = self.Session() # 创建一个新的 session
        while True:
            time.sleep(random.randint(1, 20)) # 随机等待几秒钟,在 1~20 之间
            self.add_dog(sess)

def main():
    sesslist = list()
    for i in range(300): # 创建300个线程
        t = OpThread(str(i), Session)
        t.start()
    while True:
        time.sleep(100)

main()

如上代码创建了300个线程,可想而知,线程池中默认最大的15个链接根本不够用,如果不够用了,那么执行数据库操作的时候,就会被 engine 阻塞,直到 timeout 到达,如果超时时间到了还没排队到自己,那么直接返回超时错误。

raise exc.TimeoutError(
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out, timeout 30.00 (Background on this error at: http://sqlalche.me/e/14/3o7r)

三、多线程共享 session

3.1 线程间共享 session 易报错

那么,如果一个程序里面确实创建了很多个线程,并且不想创建太多的 connection ,以免给数据库服务端造成太大的压力,那有什么办法可以做到吗?有没有一种可能,多个线程共享同一个 session 呢?

原则上是不能的,因为 SQLAlchemy 的 Session在设计的时候就是完全不考虑并行的

如果多个线程共享了同一个session,那么其中一个线程进行数据库操作的时候,必然会影响这个 session 的状态,其它使用这个 session 的线程可能得到错误的数据。

例如A线程在执行 query()?并读取结果数据的时候,B线程执行了add()并执行了 commit,那么,A线程就可能出错:

例如,多线程共享同一个 session 报错的代码演示:

import json
import time
import models
import dbopt
import threading
import uuid
import random
import datetime
from sqlalchemy import func


class OpThread(threading.Thread):
    def __init__(self, jobname, dbsess, op):
        threading.Thread.__init__(self, daemon=True)
        self.jobname = jobname
        self.dbsess = dbsess
        self.operate = op

    def log(self, msg):
        print("{0} - {1}".format(self.jobname, msg))

    def run(self):
        while True:
            time.sleep(1)
            self.log("job running [{0}]".format(self.jobname))
            if 'w' == self.operate:
                self.add_data()
            if 'r' == self.operate:
                self.query_data()

    def add_data(self):
        try:
            uid = str(uuid.uuid4())
            name = self.jobname + '-' + uid
            self.log("add dog which name is : [{0}]".format(name))
            self.dbsess.add(models.TDog(name, uid))
            self.dbsess.commit()
        except Exception as e:
            self.log("in add - {0}".format(str(e)))
            self.dbsess.rollback()

    def query_data(self):
        try:
            dog = self.dbsess.query(models.TDog).order_by(func.random()).offset(0).limit(1).first()
            time.sleep(random.randint(1, 3))
            self.log("modify dog which name is : [{0}]".format(dog.name))
            self.log('in read - {0}'.format(dog.data))
            data = json.loads(dog.data)
        except Exception as e:
            self.log("in modify - {0}".format(str(e)))
            self.dbsess.rollback()

def main():
    sess = dbopt.Sess()
    a = OpThread('write', sess, 'r')
    a.start()
    for i in range(10):
        b = OpThread('read', sess, 'w')
        b.start()

    while True:
        time.sleep(100)

if "__main__" == __name__:
    main()

上述代码在运行过程中,会出现如下错误:

sqlalchemy/engine/base.py:2362: SAWarning: transaction already deassociated from connection

sqlalchemy/orm/session.py:3229: SAWarning: Usage of the 'Session.add()' operation is not currently supported within the execution stage of the flush process. Results may not be consistent. ?Consider using alternative event listeners or connection-level operations instead.

sqlalchemy/orm/session.py:878: SAWarning: Session's state has been changed on a non-active transaction - this state will be discarded.

所以多线程间共享 SQLAlchemy 的 session 是无法正常使用的。

3.2 多线程间通过互斥锁互斥访问 session

那么在线程之间,使用互斥锁,让每个线程在访问 session 的时候,不受其它线程的干扰,可以正常使用吗?

下面是在 session 执行的关键部位加互斥锁的代码:

def add_data(self):
    try:
        uid = str(uuid.uuid4())
        name = self.jobname + '-' + uid 
        self.log("add dog which name is : [{0}]".format(name))
        mutex.acquire() # 加锁
        self.dbsess.add(models.TDog(name, uid))
        self.dbsess.commit()
        mutex.release() # 释放锁
    except Exception as e:
        self.log("in add - {0}".format(str(e)))
        self.dbsess.rollback()


def query_data(self):
    try:
        mutex.acquire() # 加锁
        dog = self.dbsess.query(models.TDog).order_by(func.random()).offset(0).limit(1).first()
        mutex.release() # 释放锁
        time.sleep(random.randint(1, 3)) 
        self.log("query dog which name is : [{0}]".format(dog.name))
        self.log('in read - {0}'.format(dog.data))
        data = json.loads(dog.data)
    except Exception as e:
        self.log("in read - {0}".format(str(e)))
        self.dbsess.rollback()

执行过程中,可能报错如下:

This session is in 'prepared' state; no further SQL can be emitted within this transaction.

sqlalchemy/engine/base.py:2362: SAWarning: transaction already deassociated from connection

原因是,通过?query() 获取对象后,在其它线程使用这个 session 进行操作之前,你没有及时把数据读完了,那么这些数据就被删除了。也就是上面?query_data() 方法中的情况。

那么把互斥锁一直握着不释放,一直到把数据读取完成后,再释放锁呢?这个是没有问题的,但是这就导致了所有涉及到 session 操作的函数,都需要占用互斥锁很长时间,大概率会阻塞其它线程的执行,所以这样的方式虽然可以防止 SQLAlchemy 出错,但是却会导致效率下降,丧失多线程的优势。

小结

如果使用多线程,那么需要注意如下事项:

  1. 每个线程中使用 Session() 创建新的 session - 优势:线程间不会互相干扰,劣势:线程多了会导致创建很多?connection,数据库服务端压力增大
  2. ?线程间共享 session - 优势:节省 connection,劣势:线程间互相干扰,数据出错。
  3. 线程间共享 session 并且使用互斥锁?- 优势:节省 connection ,线程间不会互相干扰;劣势:阻塞其它线程,效率下降

?

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-11-24 08:01:04  更:2021-11-24 08:02:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 8:42:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码