微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

mysql之python客户端封装类

1.直接拿来用

import datetime

import pyMysqL

from dbutils.pooled_db import PooledDB

from conf.env import DATABASE_CONfig


class MysqLClient(object):
    __pool = None

    def __init__(self, MysqL_config):
        """
        :param mincached:连接池中空闲连接的初始数量
        :param maxcached:连接池中空闲连接的最大数量
        :param maxshared:共享连接的最大数量
        :param maxconnections:创建连接池的最大数量
        :param blocking:超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
        :param maxusage:单个连接的最大重复使用次数
        :param host:数据库ip地址
        :param port:数据库端口
        :param database:库名
        :param user:用户名
        :param password:密码
        :param charset:字符编码
        """
        mincached = 10
        maxcached = 20
        maxshared = 10
        maxconnections = 100
        blocking = True
        host = MysqL_config.get("HOST")
        port = MysqL_config.get("PORT")
        database = MysqL_config.get("DATABASE")
        user = MysqL_config.get("USERNAME")
        password = MysqL_config.get("PASSWORD")
        charset = "utf8mb4"

        if not self.__pool:
            self.__class__.__pool = PooledDB(
                pyMysqL,
                mincached,
                maxcached,
                maxshared,
                maxconnections,
                blocking,
                host=host,
                port=port,
                database=database,
                user=user,
                password=password,
                charset=charset,
                cursorclass=pyMysqL.cursors.DictCursor,
            )
        self._conn = None
        self._cursor = None
        self.__get_conn()

    def __del__(self):
        if self._conn and self._cursor:
            self.db_close()

    def __get_conn(self):
        self._conn = self.__pool.connection()
        self._cursor = self._conn.cursor()

    def db_close(self):
        try:
            # self._cursor.close()
            self._conn.close()
        except Exception as e:
            raise e

    def __execute(self, sql, param=()):
        count = self._cursor.execute(sql, param)
        return count

    def __commit(self):
        """提交"""
        try:
            self._conn.commit()
        except Exception as e:
            print("__commit sql error:", e)
            self._conn.rollback()
            raise e

    @staticmethod
    def __dict_datetime_obj_to_str(result_dict):
        """把字典里面的datetime对象转成字符串,使json转换不出错"""
        if result_dict:
            result_replace = {
                k: v.__str__()
                for k, v in result_dict.items()
                if isinstance(v, datetime.datetime)
            }
            result_dict.update(result_replace)
        return result_dict

    def select_one(self, sql, param=()):
        """查询单个结果"""
        self.__execute(sql, param)
        result = self._cursor.fetchone()
        if not result:
            return dict()
        """:type result:dict"""
        result = self.__dict_datetime_obj_to_str(result)
        return result

    def select_many(self, sql, param=()):
        """
        查询多个结果
        :param sql: qsl语句
        :param param: sql参数
        :return: 查询结果集
        """
        self.__execute(sql, param)
        result = self._cursor.fetchall()
        """:type result:list"""
        [self.__dict_datetime_obj_to_str(row_dict) for row_dict in result]
        return result

    def execute_count(self, sql, param=()):
        """返回结果的数量"""
        count = self.__execute(sql, param)
        return count

    def insert(self, sql, param=()):
        """插入行"""
        result = self.__execute(sql, param)
        self.__commit()
        return result

    def insert_dict(self, table, data):
        """通过 dict 插入数据"""
        # 获取一个以键且为逗号分隔的字符串,返回一个字符串
        keys = ",".join(data.keys())
        param = list(data.values())
        s_len = ",".join(["%s"] * len(data))
        sql = f"""insert into {table}(%s) values(%s)"""
        insert_sql = sql % (keys, s_len)
        result = self.__execute(insert_sql, param)
        self.__commit()
        return result

    def update(self, sql, param=()):
        """更新"""
        result = self.__execute(sql, param)
        self.__commit()
        return result

    def batch_update(self, sql, param=()):
        """批量更新"""
        result = self._cursor.executemany(sql, param)
        self.__commit()
        return result

    def delete(self, sql, param=()):
        """删除"""
        result = self.__execute(sql, param)
        self.__commit()
        return result


if __name__ == "__main__":
    mc = MysqLClient(DATABASE_CONfig)
    sql = """UPDATE `bike_ods`.`areafence` SET `enterprise_id`=%s where fence_id=%s """
    result = mc.update(sql, ("113322", "809753148240433152"))
    print(result)
View Code

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐