tuoheng_algN/util/MyConnectionPool.py

228 lines
7.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: UTF-8 -*-
import pymysql
from loguru import logger
from dbutils.pooled_db import PooledDB
"""
@功能:创建数据库连接池
"""
class MyConnectionPool(object):
__pool = None
def __init__(self, content):
self.conn = self.__getConn(content)
self.cursor = self.conn.cursor()
# 创建数据库连接conn和游标cursor
# def __enter__(self):
# self.conn = self.__getconn()
# self.cursor = self.conn.cursor()
# 创建数据库连接池
def __getconn(self, content):
if self.__pool is None:
self.__pool = PooledDB(
creator=pymysql,
mincached=int(content["mysql"]["db_min_cached"]),
maxcached=int(content["mysql"]["db_max_cached"]),
maxshared=int(content["mysql"]["db_max_shared"]),
maxconnections=int(content["mysql"]["db_max_connecyions"]),
blocking=content["mysql"]["db_blocking"],
maxusage=content["mysql"]["db_max_usage"],
setsession=content["mysql"]["db_set_session"],
host=content["mysql"][content["dsp"]["active"]]["host"],
port=content["mysql"][content["dsp"]["active"]]["port"],
user=content["mysql"][content["dsp"]["active"]]["username"],
passwd=content["mysql"][content["dsp"]["active"]]["password"],
db=content["mysql"][content["dsp"]["active"]]["dbname"],
use_unicode=False,
charset=content["mysql"]["db_charset"]
)
return self.__pool.connection()
# 释放连接池资源
# def __exit__(self, exc_type, exc_val, exc_tb):
# self.cursor.close()
# self.conn.close()
# 关闭连接归还给链接池
def close(self):
self.cursor.close()
self.conn.close()
# 从连接池中取出一个连接
def getconn(self, content):
conn = self.__getconn(content)
cursor = conn.cursor()
return cursor, conn
# 获取连接池,实例化
def get_my_connection(content):
return MyConnectionPool(content)
'''
执行语句查询有结果返回结果没有返回0增/删/改返回变更数据条数没有返回0
'''
class MySqLHelper(object):
def __init__(self, content):
logger.info("开始加载数据库连接池!")
self.db = get_my_connection(content)
logger.info("加载数据库连接池完成!")
def __new__(cls, *args, **kwargs):
if not hasattr(cls, 'inst'): # 单例
cls.inst = super(MySqLHelper, cls).__new__(cls, *args, **kwargs)
return cls.inst
# 封装执行命令
def execute(self, sql, param=None, autoclose=False):
"""
【主要判断是否有参数和是否执行完就释放连接】
:param sql: 字符串类型sql语句
:param param: sql语句中要替换的参数"select %s from tab where id=%s" 其中的%s就是参数
:param autoclose: 是否关闭连接
:return: 返回连接conn和游标cursor
"""
cursor, conn = self.db.getconn() # 从连接池获取连接
count = 0
try:
# count : 为改变的数据条数
if param:
count = cursor.execute(sql, param)
else:
count = cursor.execute(sql)
conn.commit()
if autoclose:
self.close(cursor, conn)
except Exception as e:
pass
return cursor, conn, count
# 执行多条命令
# def executemany(self, lis):
# """
# :param lis: 是一个列表里面放的是每个sql的字典'[{"sql":"xxx","param":"xx"}....]'
# :return:
# """
# cursor, conn = self.db.getconn()
# try:
# for order in lis:
# sql = order['sql']
# param = order['param']
# if param:
# cursor.execute(sql, param)
# else:
# cursor.execute(sql)
# conn.commit()
# self.close(cursor, conn)
# return True
# except Exception as e:
# print(e)
# conn.rollback()
# self.close(cursor, conn)
# return False
# 释放连接
def close(self, cursor, conn):
logger.info("开始释放数据库连接!")
cursor.close()
conn.close()
logger.info("释放数据库连接完成!")
# 查询所有
def selectall(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchall()
return res
except Exception as e:
logger.error("查询所有数据异常:")
logger.exception(e)
self.close(cursor, conn)
return count
# 查询单条
def selectone(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
res = cursor.fetchone()
self.close(cursor, conn)
return res
except Exception as e:
logger.error("查询单条数据异常:")
logger.exception(e)
self.close(cursor, conn)
return count
# 增加
def insertone(self, sql, param):
try:
cursor, conn, count = self.execute(sql, param)
# _id = cursor.lastrowid() # 获取当前插入数据的主键id该id应该为自动生成为好
conn.commit()
self.close(cursor, conn)
return count
# 防止表中没有id返回0
# if _id == 0:
# return True
# return _id
except Exception as e:
logger.error("新增数据异常:")
logger.exception(e)
conn.rollback()
self.close(cursor, conn)
return count
# 增加多行
def insertmany(self, sql, param):
"""
:param sql:
:param param: 必须是元组或列表[(),()]或((),())
:return:
"""
cursor, conn, count = self.db.getconn()
try:
cursor.executemany(sql, param)
conn.commit()
return count
except Exception as e:
logger.error("增加多条数据异常:")
logger.exception(e)
conn.rollback()
self.close(cursor, conn)
return count
# 删除
def delete(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
self.close(cursor, conn)
return count
except Exception as e:
logger.error("删除数据异常:")
logger.exception(e)
conn.rollback()
self.close(cursor, conn)
return count
# 更新
def update(self, sql, param=None):
try:
cursor, conn, count = self.execute(sql, param)
conn.commit()
self.close(cursor, conn)
return count
except Exception as e:
logger.error("更新数据异常:")
logger.exception(e)
conn.rollback()
self.close(cursor, conn)
return count