Python psycopg2 模块,pool() 实例源码
我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用psycopg2.pool()。
def __init__(self, dbname: str, isolation: str = "auto-commit"):
# Establishes the connection with the backend databases.
isolation_level = self.isolation_opts.get(isolation, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
try:
if dbname == "YOUR_DATABASE_NAME":
self.__conn = self.__pool.getconn()
self.__conn.set_isolation_level(isolation_level)
else:
self.__pool = None
self.__conn = None
except psycopg2.pool.PoolError as e:
self.__pool = None
self.__conn = None
raise(e)
except Exception as e:
tb.print_exc()
raise(e)
def get_db_connection():
"""
psycopg2 connection context manager.
Fetch a connection from the connection pool and release it.
"""
try:
connection = pool.getconn()
yield connection
finally:
pool.putconn(connection)
def __init__(self, host, database, user, autocommit=True, pool=PostgresPool,
default_schema='public'):
"""
Instantiate class from given SourceDb entry (one row in the datasource table)
:param host: Host address (domain or IP)
:param database: Name of the database
:param user: User
"""
self.user = user
self.hostname = host
self.dbname = database
# the password is taken from ~/.pgpass
if self.dbname not in self.connection_pools:
self.connection_pools[self.dbname] = pool(autocommit=autocommit,
minconn=1,
maxconn=5,
user=self.user,
host=self.hostname,
database=self.dbname,
default_schema=default_schema,
connect_timeout=config.DB_CONNECT_TIMEOUT)
def _connect(self):
if self.application_name is None:
st = inspect.stack()
self.application_name = os.path.basename(st[-1][1])
conn_args = dict(
host=self.url.hostname,
port=self.url.port,
database=self.url.database,
user=self.url.username,
password=self.url.password,
application_name=self.application_name + "/" + hgvs.__version__, )
if self.pooling:
_logger.info("Using UTA ThreadedConnectionPool")
self._pool = psycopg2.pool.ThreadedConnectionPool(hgvs.global_config.uta.pool_min,
hgvs.global_config.uta.pool_max, **conn_args)
else:
self._conn = psycopg2.connect(**conn_args)
self._conn.autocommit = True
self._ensure_schema_exists()
# remap sqlite's ? placeholders to psycopg2's %s
self._queries = {k: v.replace('?', '%s') for k, v in six.iteritems(self._queries)}
def __del__(self):
""" implicitly called when garbage collections comes around to clean up objects.
the database connection is returned to the pool """
if self.__pool and self.__conn:
self.__pool.putconn(self.__conn)
self.__pool = None
self.__conn = None
def disconnect(self):
""" close the connection. it doesn't return to the pool """
if self.__pool and self.__conn:
self.__pool.putconn(self.__conn, close=True)
self.__pool = None
self.__conn = None
def getGavoDBConn():
if (g_db_pool):
return g_db_pool.getconn()
else:
raise Exception('connection pool is None when get conn')
def putGavoDBConn(conn):
if (g_db_pool):
g_db_pool.putconn(conn)
else:
raise Exception('connection pool is None when put conn')
def close(self):
if self.pooling:
_logger.warning("Closing pool; future mapping and validation will fail.")
self._pool.closeall()
else:
_logger.warning("Closing connection; future mapping and validation will fail.")
if self._conn is not None:
self._conn.close()
def close(self):
if self.pooling:
_logger.warning("Closing pool; future mapping and validation will fail.")
self._pool.closeall()
else:
_logger.warning("Closing connection; future mapping and validation will fail.")
if self._conn is not None:
self._conn.close()
def connection_pool(max_conn, dsn_dict: Dict[str, str], application_name=psycopg2.__name__):
"""
Create a connection pool (with up to max_conn connections),where all connections will use the
given connection string.
"""
dsn_values = _dsn_connection_values(dsn_dict, application_name)
return psycopg2.pool.ThreadedConnectionPool(1, max_conn, **dsn_values)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。