Python sqlalchemy 模块,pool() 实例源码
我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用sqlalchemy.pool()。
def sqlite_performance_over_safety(
dbapi_con: sqlite3.Connection,
con_record: sqlalchemy.pool.
_ConnectionRecord # pylint: disable=protected-access
) -> None:
"""Significantly speeds up inserts but will break on crash."""
con_record # pylint: disable=pointless-statement
dbapi_con.execute('PRAGMA journal_mode = MEMORY')
dbapi_con.execute('PRAGMA synchronous = OFF')
def apply_driver_hacks(self, app, info, options):
"""This method is called before engine creation and used to inject
driver specific hacks into the options. The `options` parameter is
a dictionary of keyword arguments that will then be used to call
the :func:`sqlalchemy.create_engine` function.
The default implementation provides some saner defaults for things
like pool sizes for MysqL and sqlite. Also it injects the setting of
`sqlALCHEMY_NATIVE_UNICODE`.
"""
if info.drivername.startswith('MysqL'):
info.query.setdefault('charset', 'utf8')
if info.drivername != 'MysqL+gaerdbms':
options.setdefault('pool_size', 10)
options.setdefault('pool_recycle', 7200)
elif info.drivername == 'sqlite':
pool_size = options.get('pool_size')
detected_in_memory = False
# we go to memory and the pool size was explicitly set to 0
# which is fail. Let the user kNow that
if info.database in (None, '', ':memory:'):
detected_in_memory = True
from sqlalchemy.pool import StaticPool
options['poolclass'] = StaticPool
if 'connect_args' not in options:
options['connect_args'] = {}
options['connect_args']['check_same_thread'] = False
if pool_size == 0:
raise RuntimeError('sqlite in memory database with an '
'empty queue not possible due to data '
'loss.')
# if pool size is None or explicitly set to 0 we assume the
# user did not want a queue for this sqlite connection and
# hook in the null pool.
elif not pool_size:
from sqlalchemy.pool import NullPool
options['poolclass'] = NullPool
# if it's not an in memory database we make the path absolute.
if not detected_in_memory:
info.database = os.path.join(app.root_path, info.database)
unu = app.config['sqlALCHEMY_NATIVE_UNICODE']
if unu is None:
unu = self.use_native_unicode
if not unu:
options['use_native_unicode'] = False
def apply_driver_hacks(self, ':memory:'):
detected_in_memory = True
if pool_size == 0:
raise RuntimeError('sqlite in memory database with an '
'empty queue not possible due to data '
'loss.')
# if pool size is None or explicitly set to 0 we assume the
# user did not want a queue for this sqlite connection and
# hook in the null pool.
elif not pool_size:
from sqlalchemy.pool import NullPool
options['poolclass'] = NullPool
# if it's not an in memory database we make the path absolute.
if not detected_in_memory:
info.database = os.path.join(app.root_path, info.database)
unu = app.config['sqlALCHEMY_NATIVE_UNICODE']
if unu is None:
unu = self.use_native_unicode
if not unu:
options['use_native_unicode'] = False
def apply_driver_hacks(self, 7200)
elif info.drivername == 'sqlite':
pool_size = options.get('pool_size')
detected_in_memory = False
if info.database in (None, ':memory:'):
detected_in_memory = True
from sqlalchemy.pool import StaticPool
options['poolclass'] = StaticPool
if 'connect_args' not in options:
options['connect_args'] = {}
options['connect_args']['check_same_thread'] = False
# we go to memory and the pool size was explicitly set
# to 0 which is fail. Let the user kNow that
if pool_size == 0:
raise RuntimeError('sqlite in memory database with an '
'empty queue not possible due to data '
'loss.')
# if pool size is None or explicitly set to 0 we assume the
# user did not want a queue for this sqlite connection and
# hook in the null pool.
elif not pool_size:
from sqlalchemy.pool import NullPool
options['poolclass'] = NullPool
# if it's not an in memory database we make the path absolute.
if not detected_in_memory:
info.database = os.path.join(app.root_path, info.database)
unu = app.config['sqlALCHEMY_NATIVE_UNICODE']
if unu is None:
unu = self.use_native_unicode
if not unu:
options['use_native_unicode'] = False
def __init__(self, namespace, url=None, sa_opts=None, optimistic=False,
table_name='beaker_cache', data_dir=None, lock_dir=None,
**params):
"""Creates a database namespace manager
``url``
sqlAlchemy compliant db url
``sa_opts``
A dictionary of sqlAlchemy keyword options to initialize the engine
with.
``optimistic``
Use optimistic session locking,note that this will result in an
additional select when updating a cache value to compare version
numbers.
``table_name``
The table name to use in the database for the cache.
"""
OpenResourceNamespaceManager.__init__(self, namespace)
if sa_opts is None:
sa_opts = params
if lock_dir:
self.lock_dir = lock_dir
elif data_dir:
self.lock_dir = data_dir + "/container_db_lock"
if self.lock_dir:
verify_directory(self.lock_dir)
# Check to see if the table's been created before
url = url or sa_opts['sa.url']
table_key = url + table_name
def make_cache():
# Check to see if we have a connection pool open already
Meta_key = url + table_name
def make_Meta():
# sqlAlchemy pops the url,this ensures it sticks around
# later
sa_opts['sa.url'] = url
engine = sa.engine_from_config(sa_opts, 'sa.')
Meta = sa.MetaData()
Meta.bind = engine
return Meta
Meta = DatabaseNamespaceManager.Metadatas.get(Meta_key, make_Meta)
# Create the table object and cache it Now
cache = sa.Table(table_name, Meta,
sa.Column('id', types.Integer, primary_key=True),
sa.Column('namespace', types.String(255), nullable=False),
sa.Column('accessed', types.DateTime,
sa.Column('created',
sa.Column('data', types.PickleType,
sa.UniqueConstraint('namespace')
)
cache.create(checkfirst=True)
return cache
self.hash = {}
self._is_new = False
self.loaded = False
self.cache = DatabaseNamespaceManager.tables.get(table_key, make_cache)
def __init__(self, config):
"""
@param config: Configuration file name
"""
parser = ConfigParser()
parser.readfp(open(config), config)
# engine is reused between multiple processes,make sure that we don't
# share connections by disabling pool
options = dict(parser.items("database"))
self._engine = sqlalchemy.create_engine(options['url'], poolclass=NullPool,
isolation_level='READ_COMMITTED')
self._Metadata = MetaData(self._engine)
self._tables = {}
try:
options = dict(parser.items("l1db"))
except NoSectionError:
options = {}
self._dia_object_index = options.get('dia_object_index', 'baseline')
self._dia_object_nightly = bool(int(options.get('dia_object_nightly', 0)))
self._months_sources = int(options.get('read_sources_months', 0))
self._months_fsources = int(options.get('read_forced_sources_months', 0))
self._read_full_objects = bool(int(options.get('read_full_objects', 0)))
self._source_select = options.get('source_select', "by-fov")
self._object_last_replace = bool(int(options.get('object_last_replace', 0)))
if self._dia_object_index not in ('baseline', 'htm20_id_iov', 'last_object_table'):
raise ValueError('unexpected dia_object_index value: ' + str(self._dia_object_index))
if self._source_select not in ('by-fov', 'by-oid'):
raise ValueError('unexpected source_select value: ' + self._source_select)
_LOG.info("L1DB Configuration:")
_LOG.info(" dia_object_index: %s", self._dia_object_index)
_LOG.info(" dia_object_nightly: %s", self._dia_object_nightly)
_LOG.info(" read_sources_months: %s", self._months_sources)
_LOG.info(" read_forced_sources_months: %s", self._months_fsources)
_LOG.info(" read_full_objects: %s", self._read_full_objects)
_LOG.info(" source_select: %s", self._source_select)
_LOG.info(" object_last_replace: %s", self._object_last_replace)
#-------------------
# Public methods --
#-------------------
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。