IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> pgsql连接池的实现 -> 正文阅读

[大数据]pgsql连接池的实现

连接池的作用及原理

正常访问数据库的过程中,每次访问都需要创建数据库的连接,这会消耗大量的资源;连接池的就是为数据库连接建立一个“缓冲区”,预先在缓冲池中放入一定数量的连接对象,当需要建立数据库连接时,只需从“缓冲池”中取出一个,使用完毕之后再放回去;且连接池允许多个客户端使用缓存起来的连接对象,这些对象可以连接数据库,它们是共享的、可被重复使用的;使用连接池可以节省大量资源,提高程序运行速度。

连接池的基本原理是:先初始化一定的数据库连接对象,并且把这些连接保存在连接池中。这些数据库连接的数量是由最小数据库连接数来设定的。连接池的最大数据库连接数量限定了这个连接池能占有的最大连接数,当应用程序向连接池请求的连接数超过最大连接数量时,这些请求将被加入到等待队列中。当程序需要访问数据库的时候,如果连接池中有空闲的连接,可直接得到一个连接;如果连接池对象中没有空闲的连接,且连接数没有达到最大,会创建一个新的连接从连接池中取出一个连接,数据库操作结束后,再把这个用完的连接重新放回连接池。

Python连接PostgreSql

首先安装 psycopg2模块,pycopg2是Python编程语言的PostgreSQL数据库的适配器。

pip install psycopg2

查阅psycopg2官方手册可以找到psycopg2.pool连接池

对PostgreSql连接池进行封装,代码如下:

#!/usr/bin/env python
# -*- coding=utf-8 -*-
import traceback
from traceback import format_exc

import psycopg2
from DBUtils.PooledDB import PooledDB
from psycopg2.extras import RealDictCursor

from config.conf import PG_CONFIG
from logger.log import savelog

class PostgreSqlPool(object):
    """单例模式PostgreSql数据库连接池"""

    # __instance_lock = threading.Lock()
    #
    # def __new__(cls, *args, **kwargs):
    #     if not hasattr(PostgreSqlPool, "_instance"):
    #         with cls.__instance_lock:
    #             cls._instance = super(PostgreSqlPool, cls).__new__(cls, *args, **kwargs)
    #     return cls._instance

    def __init__(self, config, maxcached, cursor_factory=None):
        self.__psycopg_pool = None
        self.__CONFIG = config
        self.__maxcached = maxcached
        self.cursor_factory = cursor_factory

    def _get_pool(self):
        """
        创建连接池,retry三次
        :return:
        """
        for i in range(3):
            if self.__psycopg_pool is not None:
                break
            try:
                self.__psycopg_pool = PooledDB(creator=psycopg2, mincached=1, maxcached=self.__maxcached,
                                               cursor_factory=self.cursor_factory, **self.__CONFIG)
                return
            except Exception as e:
                self.__psycopg_pool = None
                self._deal_with_error(e)
            if self.__psycopg_pool is None and i == 2:
                raise Exception("Failed to Create Psycopg_pool")

    def _connect(self):
        """连接数据库"""
        if self.__psycopg_pool is None:
            self._get_pool()
        try:
            self.conn = self.__psycopg_pool.connection()
            self.cursor = self.conn.cursor()
        except Exception as e:
            self._deal_with_error(e)
            raise Exception("Failed to Connect to Database")

    def _disconnect(self):
        """释放数据库连接"""
        try:
            self.cursor.close()
            self.conn.close()
        except Exception as e:
            self._deal_with_error(e)

    def _execute(self, sql, param):
        """执行sql语句"""
        try:
            self._connect()
            self.cursor.execute(sql, param)
            self.conn.commit()
            result = self.cursor.rowcount
        except Exception as e:
            self.conn.rollback()
            savelog.error("error sql: {}".format(sql))
            savelog.error("error data: {}".format(param))
            self._deal_with_error(e)
            result = None
        finally:
            self._disconnect()
        return result

    def get_conn(self):
        """
        获取连接池的连接
        :return:
        """
        if self.__psycopg_pool is None:
            self._get_pool()
        return self.__psycopg_pool.connection()

    def get_one(self, sql, param=None):
        """
        查询一条数据
        :param sql:
        :param param: list or tuple
        :return: 查询结果 --> dict
        """
        try:
            self._connect()
            self.cursor.execute(sql, param)
            result = self.cursor.fetchone()
        except Exception as e:
            savelog.error("error sql: %s" % sql)
            self._deal_with_error(e)
            result = None
        finally:
            self._disconnect()
        return result

    def get_all(self, sql, param=None):
        """
        查询所有数据
        :param sql:
        :param param: list or tuple
        :return: 查询结果 --> [dict, dict, ...]
        """
        try:
            self._connect()
            self.cursor.execute(sql, param)
            result = self.cursor.fetchall()
        except Exception as e:
            savelog.error("error sql: %s" % sql)
            self._deal_with_error(e)
            result = None
        finally:
            self._disconnect()

        return result

    def insert_one(self, sql, param=None):
        """
        插入一条数据
        :param sql:
        :param param: list or tuple
        :return: 受影响的行数 --> int
        """
        return self._execute(sql, param)

    def insert_many(self, sql, param=None):
        """
        插入多条数据
        :param sql:
        :param param: [(v1,v2...), (v1,v2...)]
        :return: 受影响的行数 --> int
        """
        try:
            self._connect()
            self.cursor.executemany(sql, param)
            self.conn.commit()
            result = self.cursor.rowcount
        except Exception as e:
            savelog.error("error sql: %s" % sql)
            self.conn.rollback()
            self._deal_with_error(e)
            result = None
        finally:
            self._disconnect()
        return result

    def insert_and_get(self, sql, param=None):
        """
        插入数据的同时,获取数据
        :param sql:
        :param param:
        :return:
        """
        try:
            self._connect()
            self.cursor.execute(sql, param)
            self.conn.commit()
            result = self.cursor.fetchone()
        except Exception as e:
            savelog.error("error sql: %s" % sql)
            self.conn.rollback()
            self._deal_with_error(e)
            result = None
        finally:
            self._disconnect()
        return result

    def delete(self, sql, param=None):
        """
        删除
        :param sql:
        :param param:
        :return: 受影响的行数 --> int
        """
        return self._execute(sql, param)

    def update(self, sql, param=None):
        """
        更新
        :param sql:
        :param param:
        :return: 受影响的行数 --> int
        """
        return self._execute(sql, param)

    def do(self, sql, param=None):
        """
        执行其他sql
        :param sql:
        :param param: list or tuple
        :return:
        """
        return self._execute(sql, param)

    @staticmethod
    def transaction_do(param_list):
        """
        事务操作,伴随更新和插入
        :param param_list:
        :return:
        """
        conn, cur, count = None, None, None
        try:
            conn = mdb.get_conn()
            cur = conn.cursor()
            for param_dict in param_list:
                single, sql, param = param_dict.get('single'), param_dict.get('sql'), param_dict.get('param')
                if single:
                    cur.execute(sql, param)
                else:
                    cur.executemany(sql, param)
            count = cur.rowcount
            conn.commit()
        except Exception:
            savelog.error('transaction_do data_into_db error:{}'.format(traceback.format_exc()))
            count = None
            if conn:
                conn.rollback()
        finally:
            if cur:
                cur.close()
            if conn:
                conn.close()
        return count

    @staticmethod
    def _deal_with_error(e):
        savelog.error(format_exc())

db = PostgreSqlPool(PG_CONFIG, 30, RealDictCursor)

if __name__ == '__main__':
    pass

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-19 01:14:20  更:2022-02-19 01:15:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 10:53:08-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码