IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 根据CI模式弄的一个python3访问MySql数据库的class -> 正文阅读

[大数据]根据CI模式弄的一个python3访问MySql数据库的class

python 3及以上的。mysql的。可能会有小问题。主要是没时间仔细测试

#!/usr/bin/python

-- coding: utf-8 --

######################

数据库操作类

######################

from sqlite3 import Cursor
import pymysql
import string
import re
import datetime
import time
import traceback
import sys

class mysql:
test_db = {“host”: “localhost”, “user”: “root”,
“passwd”: “*******”, “db”: “adsys”, “port”: “3306”, “charset”: “utf8”}

def __init__(self, conn_config):
    self.conn_config = conn_config
    self.conn = None
    self.cur = None
    self.where_sql = ""
    self.where_params = []

def __del__(self):
    if self.conn != None:
        self.conn.close
        self.conn = None
    if self.where_sql != None and self.where_sql != "":
        self.where_sql = None
        self.where_params = None

def close(self):
    if self.conn != None:
        self.conn.close
        self.conn = None
    if self.where_sql != None and self.where_sql != "":
        self.where_sql = ""
        self.where_params = []

def connection(self):
    if self.conn == None:
        self.conn = pymysql.connect(host=self.conn_config["host"], user=self.conn_config["user"], passwd=self.conn_config["passwd"],
                                    db=self.conn_config["db"], port=string.atoi(self.conn_config["port"]), charset=self.conn_config["charset"])
        self.cur = self.conn.cursor(Cursor="DictCursor")

# 执行SQL并根据SQL访问数据
# 如SQL是select语句则返回全部查询数据
# 如SQL是insert语句则返回插入的ID
# 其它的不返回
def exec_sql(self, sql, args=None, disconnect=True):
    try:
        self.connection()
        if re.search("^\s*SELECT", sql, re.IGNORECASE) != None:
            if args != None:
                self.cur.execute(sql, args)
            else:
                self.cur.execute(sql)
            data = self.cur.fetchall()
            self.conn.commit()
            return data

        if re.search("^\s*INSERT\s+INTO[\s\S]+VALUES", sql, re.IGNORECASE) != None:
            if args != None:
                self.cur.execute(sql, args)
            else:
                self.cur.execute(sql)
            insert_id = self.conn.insert_id()
            self.conn.commit()
            return insert_id

        if args != None:
            self.cur.execute(sql, args)
        else:
            self.cur.execute(sql)
        self.conn.commit()
    except Exception as e:
        if self.conn != None:
            self.conn.rollback()
        raise e
    finally:
        if disconnect and self.conn != None:
            self.conn.close()
            self.conn = None

# 插入[insert]
# input table 表名
# input data 数据的关联字典
# out insert_id
def insert(self, table, data, disconnect=True):
    sql = ""
    try:
        keys = ""
        values = ""
        sqlparams = []
        for key, value in data.items():
            keys += "," + key if len(keys) > 0 else key
            values += ",%s" if len(values) > 0 else "%s"
            if type(value) == datetime.datetime:
                sqlparams.append(value.strftime("%Y-%m-%d %H:%M:%S"))
                continue
            if type(value) == time.struct_time:
                sqlparams.append(time.strftime("%Y-%m-%d %H:%M:%S", value))
                continue
            sqlparams.append(value)
        sql = "INSERT INTO " + table + \
            " (" + keys + ") VALUES (" + values + ")"
        self.connection()
        self.cur.execute(sql, tuple(sqlparams))
        insert_id = self.conn.insert_id()
        self.conn.commit()
        return insert_id
    except Exception as e:
        if self.conn != None:
            self.conn.rollback()
        raise e
    finally:
        if disconnect and self.conn != None:
            self.conn.close()
            self.conn = None
        if self.where_sql != None and self.where_sql != "":
            self.where_sql = ""
            self.where_params = []

# 批量插入[insert_batch]
# input table 表名
# input data 数据的关联元组字典
# out insert_id
def insert_batch(self, table, data, disconnect=True):
    sql = ""
    try:
        keys = ""
        values = ""
        args = range(len(data))
        for key, value in data[0].items():
            keys += "," + key if len(keys) > 0 else key
            values += ",%s" if len(values) > 0 else "%s"
        sql = "INSERT INTO " + table + \
            " (" + keys + ") VALUES (" + values + ")"
        i = 0
        for row in data:
            sqlparams = []
            for key, value in data[i].items():
                if type(value) == datetime.datetime:
                    sqlparams.append(value.strftime("%Y-%m-%d %H:%M:%S"))
                    continue
                if type(value) == time.struct_time:
                    sqlparams.append(time.strftime(
                        "%Y-%m-%d %H:%M:%S", value))
                    continue
                sqlparams.append(value)
            args[i] = tuple(sqlparams)
            i += 1
        self.connection()
        self.cur.executemany(sql, args)
        insert_id = self.conn.insert_id()
        self.conn.commit()
        return insert_id
    except Exception as e:
        if self.conn != None:
            self.conn.rollback()
        raise e
    finally:
        if disconnect and self.conn != None:
            self.conn.close()
            self.conn = None
        if self.where_sql != None and self.where_sql != "":
            self.where_sql = ""
            self.where_params = []

# 获取insert sql string[insert_string]
# input table 表名
# input data 数据的关联字典
def insert_string(self, table, data):
    keys = ''
    values = ''
    for key, value in data.items():
        keys += "," + key if len(keys) > 0 else key
        temp_value = ""
        if type(value) == datetime.datetime:
            temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
            values += "," + temp_value if len(values) > 0 else temp_value
            continue
        if type(value) == time.struct_time:
            temp_value = time.strftime("%Y-%m-%d %H:%M:%S", value)
            values += "," + temp_value if len(values) > 0 else temp_value
            continue
        values += ",'%s'" % ('%s' % value).replace("'", "\\'") if len(
            values) > 0 else "'%s'" % ('%s' % value).replace("'", "\\'")
    sql = "INSERT INTO " + table + \
        " (" + keys + ") VALUES (" + values + ")"
    return sql

# 获取insert sql string ...  on duplicate key update ...  [insert_string]
# input table 表名
# input data 数据的关联字典
# input update_data 可以是字符串也可以是字典
# out sql string
def insert_on_duplicate_key_update_string(self, table, data, update_data=None):
    sql = self.insert_string(table, data)
    update_str = ''
    if update_data == None:
        update_data = data
    update_type = type(update_data)
    if isinstance(update_data, str):
        sql += " ON DUPLICATE KEY UPDATE " + update_data
    else:
        for k, v in update_data.items():
            update_str += "%s = '%s'" % (k, ('%s' % v).replace("'", "\\'")) if len(
                update_str) == 0 else ", %s = '%s'" % (k, ('%s' % v).replace("'", "\\'"))
        sql += " ON DUPLICATE KEY UPDATE " + update_str
    return sql

# where条件[where]默认为and
# input data 条件的关联字典
# 用法:{"字段名 >":"值"}
def where(self, data):
    for key, value in data.items():
        self.where_sql += " AND " + key + \
            " %s" if len(self.where_sql) > 0 else " WHERE " + key + " %s"
    self.where_params.extend(data.values())

# where or 条件[where]
# input data 条件的关联字典
# 用法:{"字段名 >":"值"} data为一组即如果data有多项时 and (a > 0 or b > 0)。如果一项时则 or a > 0
def where_or(self, data):
    data_items = data.items()
    if len(data_items) > 1:
        for key, value in data_items:
            self.where_sql += " OR " + key + \
                " %s" if len(
                    self.where_sql) > 0 else " WHERE ( " + key + " %s"
        self.where_sql += " )"
    else:
        if len(data_items) > 0:
            for key, value in data_items:
                self.where_sql += " OR " + key + \
                    " %s" if len(self.where_sql) > 0 else " WHERE " + key
    self.where_params.extend(data.values())

# 更新[update]
# input table 表名
# input data 数据的关联字典
# input where 条件的关联字典
def update(self, table, data, where=None, disconnect=True):
    sql = ""
    try:
        set_str = ""
        params = data.values()
        for key, value in data.items():
            set_str += ", " + key + \
                " = %s" if len(set_str) > 0 else key + " = %s"
        sql = "UPDATE " + table + " SET " + set_str
        if where != None:
            self.where(where)
        if len(self.where_sql) > 0:
            sql += self.where_sql
        if len(self.where_params) > 0:
            params.extend(self.where_params)
        self.connection()
        self.cur.execute(sql, tuple(params))
        self.conn.commit()
    except Exception as e:
        if self.conn != None:
            self.conn.rollback()
        raise e
    finally:
        if disconnect and self.conn != None:
            self.conn.close()
            self.conn = None
        if self.where_sql != None and self.where_sql != "":
            self.where_sql = ""
            self.where_params = []

# 获取update sql string[update_string]
# input table 表名
# input data 数据的关联字典
# input where 条件的关联字典
# out sql string
def update_string(self, table, data, where):
    set_str = ""
    for key, value in data.items():
        temp_value = ""
        if type(value) == datetime.datetime:
            temp_value = value.strftime("%Y-%m-%d %H:%M:%S")
            set_str += ", " + key + " = " + \
                temp_value if len(set_str) > 0 else key + \
                " = " + temp_value
            continue
        if type(value) == time.struct_time:
            temp_value = time.strftime("%Y-%m-%d %H:%M:%S", value)
            set_str += ", " + key + " = " + \
                temp_value if len(set_str) > 0 else key + \
                " = " + temp_value
            continue
        set_str += ", " + key + \
            " = %s" % value if len(set_str) > 0 else key + " = %s" % value
    sql = "UPDATE " + table + " SET " + set_str
    where_str = ""
    for key, value in where.items():
        where_str += " AND " + key + \
            " = %s" % value if len(
                where_str) > 0 else " WHERE " + key + " = %s" % value
    sql += where_str
    return sql

# 删除[delete]
# input table 表名
# input where 条件的关联字典
def delete(self, table, where=None, disconnect=True):
    sql = ""
    try:
        params = None
        sql = "DELETE FROM " + table
        if where != None:
            self.where(where)
        if len(self.where_sql) > 0:
            sql += self.where_sql
        if len(self.where_params) > 0:
            params = self.where_params
        self.connection()
        self.cur.execute(sql, tuple(params))
        self.conn.commit()
    except Exception as e:
        if self.conn != None:
            self.conn.rollback()
        raise e
    finally:
        if disconnect and self.conn != None:
            self.conn.close()
            self.conn = None
        if self.where_sql != None and self.where_sql != "":
            self.where_sql = ""
            self.where_params = []
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-09-04 01:18:22  更:2022-09-04 01:20:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 0:22:02-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码