Python sqlalchemy 模块,engine() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.engine()。
def delete_obj(self, objects, uow):
"""called by a UnitOfWork object to delete objects,which involves a
DELETE statement for each table used by this mapper,for each object in the list."""
for table in self.tables:
if not self._has_pks(table):
continue
delete = []
for obj in objects:
params = {}
if not hasattr(obj, "_instance_key"):
continue
else:
delete.append(params)
for col in self.pks_by_table[table]:
params[col.key] = self._getattrbycolumn(obj, col)
self.extension.before_delete(self, obj)
if len(delete):
clause = sql.and_()
for col in self.pks_by_table[table]:
clause.clauses.append(col == sql.bindparam(col.key))
statement = table.delete(clause)
c = statement.execute(*delete)
if table.engine.supports_sane_rowcount() and c.rowcount != len(delete):
raise "ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete))
def __init__(self, engine, statement, parameters=None, typemap=None, **kwargs):
"""constructs a new ANSICompiler object.
engine - sqlEngine to compile against
statement - ClauseElement to be compiled
parameters - optional dictionary indicating a set of bind parameters
specified with this Compiled object. These parameters are the "default"
key/value pairs when the Compiled is executed,and also may affect the
actual compilation,as in the case of an INSERT where the actual columns
inserted will correspond to the keys present in the parameters."""
sql.Compiled.__init__(self, parameters)
self.binds = {}
self.froms = {}
self.wheres = {}
self.strings = {}
self.select_stack = []
self.typemap = typemap or {}
self.isinsert = False
def retrieve_model_id_from_hash(db_engine, model_hash):
"""Retrieves a model id from the database that matches the given hash
Args:
db_engine (sqlalchemy.engine) A database engine
model_hash (str) The model hash to lookup
Returns: (int) The model id (if found in DB),None (if not)
"""
session = sessionmaker(bind=db_engine)()
try:
saved = session.query(Model)\
.filter_by(model_hash=model_hash)\
.one_or_none()
return saved.model_id if saved else None
finally:
session.close()
def save_db_objects(db_engine, db_objects):
"""Saves a collection of sqlAlchemy model objects to the database using a copY command
Args:
db_engine (sqlalchemy.engine)
db_objects (list) sqlAlchemy model objects,corresponding to a valid table
"""
with tempfile.TemporaryFile(mode='w+') as f:
writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL)
for db_object in db_objects:
writer.writerow([
getattr(db_object, col.name)
for col in db_object.__table__.columns
])
f.seek(0)
postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
def build_filter(cls, table, tree):
try:
operator, nodes = list(tree.items())[0]
except Exception:
raise indexer.QueryError()
try:
op = cls.multiple_operators[operator]
except KeyError:
try:
op = cls.binary_operators[operator]
except KeyError:
try:
op = cls.unary_operators[operator]
except KeyError:
raise indexer.QueryInvalidOperator(operator)
return cls._handle_unary_op(engine, op, nodes)
return cls._handle_binary_op(engine, nodes)
return cls._handle_multiple_op(engine, nodes)
def test_innodb_tables(self):
sa_migration.db_sync(engine=self.migrate_engine)
total = self.migrate_engine.execute(
"SELECT count(*) "
"FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA = '%(database)s'" %
{'database': self.migrate_engine.url.database})
self.assertGreater(total.scalar(), 0, "No tables found. Wrong schema?")
noninnodb = self.migrate_engine.execute(
"SELECT count(*) "
"FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA='%(database)s' "
"AND ENGINE != 'InnoDB' "
"AND TABLE_NAME != 'migrate_version'" %
{'database': self.migrate_engine.url.database})
count = noninnodb.scalar()
self.assertEqual(count, "%d non InnoDB tables created" % count)
def retrieve_model_id_from_hash(db_engine,None (if not)
"""
session = sessionmaker(bind=db_engine)()
try:
saved = session.query(Model)\
.filter_by(model_hash=model_hash)\
.one_or_none()
return saved.model_id if saved else None
finally:
session.close()
def _check_002(self, data):
for column in ['created_at', 'updated_at', 'id', 'instance_uuid',
'cell_id', 'project_id']:
self.assertColumnExists(engine, 'instance_mappings', column)
for index in ['instance_uuid_idx', 'project_id_idx']:
self.assertIndexExists(engine, index)
self.assertUniqueConstraintExists(engine,
['instance_uuid'])
inspector = reflection.Inspector.from_engine(engine)
# There should only be one foreign key here
fk = inspector.get_foreign_keys('instance_mappings')[0]
self.assertEqual('cell_mappings', fk['referred_table'])
self.assertEqual(['id'], fk['referred_columns'])
self.assertEqual(['cell_id'], fk['constrained_columns'])
def _check_006(self, data):
for column in ['id', 'request_spec_id', 'project_id', 'user_id',
'display_name', 'instance_Metadata', 'progress', 'vm_state',
'image_ref', 'access_ip_v4', 'access_ip_v6', 'info_cache',
'security_groups', 'config_drive', 'key_name', 'locked_by']:
self.assertColumnExists(engine, 'build_requests', column)
self.assertIndexExists(engine,
'build_requests_project_id_idx')
self.assertUniqueConstraintExists(engine,
['request_spec_id'])
inspector = reflection.Inspector.from_engine(engine)
# There should only be one foreign key here
fk = inspector.get_foreign_keys('build_requests')[0]
self.assertEqual('request_specs', fk['referred_columns'])
self.assertEqual(['request_spec_id'], fk['constrained_columns'])
def _check_275(self, data):
self.assertColumnExists(engine, 'key_pairs', 'type')
self.assertColumnExists(engine, 'shadow_key_pairs', 'type')
key_pairs = oslodbutils.get_table(engine, 'key_pairs')
shadow_key_pairs = oslodbutils.get_table(engine, 'shadow_key_pairs')
self.assertisinstance(key_pairs.c.type.type,
sqlalchemy.types.String)
self.assertisinstance(shadow_key_pairs.c.type.type,
sqlalchemy.types.String)
# Make sure the keypair entry will have the type 'ssh'
key_pairs = oslodbutils.get_table(engine, 'key_pairs')
keypair = key_pairs.select(
key_pairs.c.name == 'test-migr').execute().first()
self.assertEqual('ssh', keypair.type)
def _check_313(self, data):
self.assertColumnExists(engine, 'pci_devices', 'parent_addr')
self.assertColumnExists(engine, 'shadow_pci_devices', 'parent_addr')
pci_devices = oslodbutils.get_table(engine, 'pci_devices')
shadow_pci_devices = oslodbutils.get_table(
engine, 'shadow_pci_devices')
self.assertisinstance(pci_devices.c.parent_addr.type,
sqlalchemy.types.String)
self.assertTrue(pci_devices.c.parent_addr.nullable)
self.assertisinstance(shadow_pci_devices.c.parent_addr.type,
sqlalchemy.types.String)
self.assertTrue(shadow_pci_devices.c.parent_addr.nullable)
self.assertIndexMembers(engine,
'ix_pci_devices_compute_node_id_parent_addr_deleted',
['compute_node_id', 'parent_addr', 'deleted'])
def convert_result_value(self, engine):
# Todo: perform appropriate postgres1 conversion between Python DateTime/MXDateTime
return value
def _cvt(self, fmt):
if value is None:
return None
parts = value.split('.')
try:
(value, microsecond) = value.split('.')
microsecond = int(microsecond)
except ValueError:
(value, microsecond) = (value, 0)
return time.strptime(value, fmt)[0:6] + (microsecond,)
def convert_result_value(self, engine):
tup = self._cvt(value, "%Y-%m-%d %H:%M:%s")
return tup and datetime.datetime(*tup)
def convert_result_value(self, "%Y-%m-%d")
return tup and datetime.date(*tup[0:3])
def get_params(self, **params):
"""returns a structure of bind parameters for this compiled object.
This includes bind parameters that might be compiled in via the "values"
argument of an Insert or Update statement object,and also the given **params.
The keys inside of **params can be any key that matches the BindParameterClause
objects compiled within this object. The output is dependent on the paramstyle
of the DBAPI being used; if a named style,the return result will be a dictionary
with keynames matching the compiled statement. If a positional style,the output
will be a list corresponding to the bind positions in the compiled statement.
for an executemany style of call,this method should be called for each element
in the list of parameter groups that will ultimately be executed.
"""
if self.parameters is not None:
bindparams = self.parameters.copy()
else:
bindparams = {}
bindparams.update(params)
if self.engine.positional:
d = OrderedDict()
for k in self.positiontup:
b = self.binds[k]
d[k] = b.typeprocess(b.value, self.engine)
else:
d = {}
for b in self.binds.values():
d[b.key] = b.typeprocess(b.value, self.engine)
for key, value in bindparams.iteritems():
try:
b = self.binds[key]
except KeyError:
continue
d[b.key] = b.typeprocess(value, self.engine)
return d
if self.engine.positional:
return d.values()
else:
return d
def get_named_params(self, parameters):
"""given the results of the get_params method,returns the parameters
in dictionary format. For a named paramstyle,this just returns the
same dictionary. For a positional paramstyle,the given parameters are
assumed to be in list format and are converted back to a dictionary.
"""
# return parameters
if self.engine.positional:
p = {}
for i in range(0, len(self.positiontup)):
p[self.positiontup[i]] = parameters[i]
return p
else:
return parameters
def bindparam_string(self, name):
return self.engine.bindtemplate % name
def get_column_default_string(self, column):
if isinstance(column.default, schema.PassiveDefault):
if isinstance(column.default.arg, str):
return repr(column.default.arg)
else:
return str(column.default.arg.compile(self.engine))
else:
return None
def __init__(self, connection=None, echo=False):
"""
:param str connection: sqlAlchemy
:param bool echo: True or False for sql output of sqlAlchemy engine
"""
log.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.join(PYUNIPROT_DIR, defaults.TABLE_PREFIX + 'database.log'))
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
try:
self.connection = get_connection_string(connection)
self.engine = sqlalchemy.create_engine(self.connection, echo=echo)
self.inspector = reflection.Inspector.from_engine(self.engine)
self.sessionmaker = sessionmaker(
bind=self.engine,
autoflush=False,
autocommit=False,
expire_on_commit=True
)
self.session = scoped_session(self.sessionmaker)
except:
log.warning('No valid database connection. Execute `pyuniprot connection` on command line')
def _create_tables(self, checkfirst=True):
"""creates all tables from models in your database
:param checkfirst: True or False check if tables already exists
:type checkfirst: bool
:return:
"""
log.info('create tables in {}'.format(self.engine.url))
models.Base.Metadata.create_all(self.engine, checkfirst=checkfirst)
def _drop_tables(self):
"""drops all tables in the database"""
log.info('drop tables in {}'.format(self.engine.url))
self.session.commit()
models.Base.Metadata.drop_all(self.engine)
self.session.commit()
def test_basic_query(self, conn):
rows = conn.execute('SELECT * FROM one_row').fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0].number_of_rows, 1)
self.assertEqual(len(rows[0]), 1)
def test_reflect_no_such_table(self, conn):
self.assertRaises(
NoSuchTableError,
lambda: Table('this_does_not_exist', MetaData(bind=engine), autoload=True))
self.assertRaises(
NoSuchTableError,
schema='also_does_not_exist', autoload=True))
def test_reflect_table(self, connection):
one_row = Table('one_row', autoload=True)
self.assertEqual(len(one_row.c), 1)
self.assertIsNotNone(one_row.c.number_of_rows)
def test_reflect_table_with_schema(self,
schema=SCHEMA, 1)
self.assertIsNotNone(one_row.c.number_of_rows)
def test_reflect_table_include_columns(self, connection):
one_row_complex = Table('one_row_complex', MetaData(bind=engine))
engine.dialect.reflecttable(
connection, one_row_complex, include_columns=['col_int'], exclude_columns=[])
self.assertEqual(len(one_row_complex.c), 1)
self.assertIsNotNone(one_row_complex.c.col_int)
self.assertRaises(AttributeError, lambda: one_row_complex.c.col_tinyint)
def test_reflect_schemas(self, connection):
insp = sqlalchemy.inspect(engine)
schemas = insp.get_schema_names()
self.assertIn(SCHEMA, schemas)
self.assertIn('default', schemas)
def test_reflect_select(self, autoload=True)
self.assertEqual(len(one_row_complex.c), 15)
self.assertisinstance(one_row_complex.c.col_string, Column)
rows = one_row_complex.select().execute().fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(list(rows[0]), [
True,
127,
32767,
2147483647,
9223372036854775807,
0.5,
0.25,
'a string',
datetime(2017, 1, 0),
date(2017, 2),
b'123',
'[1,2]',
'{1=2,3=4}',
'{a=1,b=2}',
Decimal('0.1'),
])
self.assertisinstance(one_row_complex.c.col_boolean.type, BOOLEAN)
self.assertisinstance(one_row_complex.c.col_tinyint.type, INTEGER)
self.assertisinstance(one_row_complex.c.col_smallint.type, INTEGER)
self.assertisinstance(one_row_complex.c.col_int.type, INTEGER)
self.assertisinstance(one_row_complex.c.col_bigint.type, BIGINT)
self.assertisinstance(one_row_complex.c.col_float.type, FLOAT)
self.assertisinstance(one_row_complex.c.col_double.type, FLOAT)
self.assertisinstance(one_row_complex.c.col_string.type, type(STRINGTYPE))
self.assertisinstance(one_row_complex.c.col_timestamp.type, TIMESTAMP)
self.assertisinstance(one_row_complex.c.col_date.type, DATE)
self.assertisinstance(one_row_complex.c.col_binary.type, BINARY)
self.assertisinstance(one_row_complex.c.col_array.type, type(STRINGTYPE))
self.assertisinstance(one_row_complex.c.col_map.type, type(STRINGTYPE))
self.assertisinstance(one_row_complex.c.col_struct.type, type(STRINGTYPE))
self.assertisinstance(one_row_complex.c.col_decimal.type, DECIMAL)
def init(settings, webapp=False, db_info=None):
global db_url, session_maker
if db_info:
db_url = db_info.url
else:
db_url = _dbUrl(web.CONfig)
settings["sqlalchemy.url"] = db_url
web.CONfig.set("mishmash", "sqlalchemy.url", settings["sqlalchemy.url"])
settings["sqlalchemy.convert_unicode"] = \
web.CONfig.get("mishmash", "sqlalchemy.convert_unicode")
settings["sqlalchemy.encoding"] = web.CONfig.get("mishmash",
"sqlalchemy.encoding")
if not db_info:
config = Namespace()
config.db_url = db_url
config.varIoUs_artists_name = web.CONfig.get("mishmash",
"varIoUs_artists_name")
db_info = dbinit(config.db_url)
db_engine = db_info.engine
session_maker = db_info.SessionMaker
initAlembic(db_url)
def test_reflect_no_such_table(self, autoload=True))
def test_reflect_table(self, 1)
self.assertIsNotNone(one_row.c.number_of_rows)
def test_reflect_table_with_schema(self, 1)
self.assertIsNotNone(one_row.c.number_of_rows)
def test_reflect_table_include_columns(self, lambda: one_row_complex.c.col_tinyint)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。