Python sqlalchemy.util 模块,OrderedSet() 实例源码
我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用sqlalchemy.util.OrderedSet()。
def _autogen_for_tables(autogen_context, upgrade_ops, schemas):
inspector = autogen_context.inspector
Metadata = autogen_context.Metadata
conn_table_names = set()
version_table_schema = \
autogen_context.migration_context.version_table_schema
version_table = autogen_context.migration_context.version_table
for s in schemas:
tables = set(inspector.get_table_names(schema=s))
if s == version_table_schema:
tables = tables.difference(
[autogen_context.migration_context.version_table]
)
conn_table_names.update(zip([s] * len(tables), tables))
Metadata_table_names = OrderedSet(
[(table.schema, table.name) for table in Metadata.sorted_tables]
).difference([(version_table_schema, version_table)])
_compare_tables(conn_table_names, Metadata_table_names,
inspector, Metadata, autogen_context)
def _autogen_for_tables(autogen_context, schemas):
inspector = autogen_context.inspector
conn_table_names = set()
version_table_schema = \
autogen_context.migration_context.version_table_schema
version_table = autogen_context.migration_context.version_table
for s in schemas:
tables = set(inspector.get_table_names(schema=s))
if s == version_table_schema:
tables = tables.difference(
[autogen_context.migration_context.version_table]
)
conn_table_names.update(zip([s] * len(tables), table.name) for table in autogen_context.sorted_tables]
).difference([(version_table_schema, autogen_context)
def _autogen_for_tables(autogen_context, autogen_context)
def _autogen_for_tables(autogen_context, autogen_context)
def _autogen_for_tables(autogen_context, autogen_context)
def _autogen_for_tables(autogen_context, autogen_context)
def _produce_net_changes(connection, diffs, autogen_context,
object_filters=(),
include_schemas=False):
inspector = Inspector.from_engine(connection)
# Todo: not hardcode alembic_version here ?
conn_table_names = set()
if include_schemas:
schemas = set(inspector.get_schema_names())
# replace default schema name with None
schemas.discard("information_schema")
# replace the "default" schema with None
schemas.add(None)
schemas.discard(connection.dialect.default_schema_name)
else:
schemas = [None]
for s in schemas:
tables = set(inspector.get_table_names(schema=s)).\
difference(['alembic_version'])
conn_table_names.update(zip([s] * len(tables), tables))
Metadata_table_names = OrderedSet([(table.schema, table.name)
for table in Metadata.sorted_tables])
_compare_tables(conn_table_names,
object_filters, autogen_context)
###################################################
# element comparison
###################################################
# render python
###################################################
# produce command structure
def _produce_net_changes(connection, autogen_context)
###################################################
# element comparison
###################################################
# render python
###################################################
# produce command structure
def _produce_net_changes(connection,
object_filters=(),
include_schemas=False):
inspector = Inspector.from_engine(connection)
conn_table_names = set()
default_schema = connection.dialect.default_schema_name
if include_schemas:
schemas = set(inspector.get_schema_names())
# replace default schema name with None
schemas.discard("information_schema")
# replace the "default" schema with None
schemas.add(None)
schemas.discard(default_schema)
else:
schemas = [None]
version_table_schema = autogen_context['context'].version_table_schema
version_table = autogen_context['context'].version_table
for s in schemas:
tables = set(inspector.get_table_names(schema=s))
if s == version_table_schema:
tables = tables.difference(
[autogen_context['context'].version_table]
)
conn_table_names.update(zip([s] * len(tables), autogen_context)
def _revision_map(self):
"""memoized attribute,initializes the revision map from the
initial collection.
"""
map_ = {}
heads = sqlautil.OrderedSet()
_real_heads = sqlautil.OrderedSet()
self.bases = ()
self._real_bases = ()
has_branch_labels = set()
has_depends_on = set()
for revision in self._generator():
if revision.revision in map_:
util.warn("Revision %s is present more than once" %
revision.revision)
map_[revision.revision] = revision
if revision.branch_labels:
has_branch_labels.add(revision)
if revision.dependencies:
has_depends_on.add(revision)
heads.add(revision.revision)
_real_heads.add(revision.revision)
if revision.is_base:
self.bases += (revision.revision, )
if revision._is_real_base:
self._real_bases += (revision.revision, )
# add the branch_labels to the map_. We'll need these
# to resolve the dependencies.
for revision in has_branch_labels:
self._map_branch_labels(revision, map_)
for revision in has_depends_on:
self._add_depends_on(revision, map_)
for rev in map_.values():
for downrev in rev._all_down_revisions:
if downrev not in map_:
util.warn("Revision %s referenced from %s is not present"
% (downrev, rev))
down_revision = map_[downrev]
down_revision.add_nextrev(rev)
if downrev in rev._versioned_down_revisions:
heads.discard(downrev)
_real_heads.discard(downrev)
map_[None] = map_[()] = None
self.heads = tuple(heads)
self._real_heads = tuple(_real_heads)
for revision in has_branch_labels:
self._add_branches(revision, map_, map_branch_labels=False)
return map_
def _compare_columns(schema, tname, conn_table, Metadata_table,
modify_table_ops, inspector):
name = '%s.%s' % (schema, tname) if schema else tname
Metadata_cols_by_name = dict((c.name, c) for c in Metadata_table.c)
conn_col_names = dict((c.name, c) for c in conn_table.c)
Metadata_col_names = OrderedSet(sorted(Metadata_cols_by_name))
for cname in Metadata_col_names.difference(conn_col_names):
if autogen_context.run_filters(
Metadata_cols_by_name[cname], cname,
"column", False, None):
modify_table_ops.ops.append(
ops.AddColumnop.from_column_and_tablename(
schema, Metadata_cols_by_name[cname])
)
log.info("Detected added column '%s.%s'", name, cname)
for colname in Metadata_col_names.intersection(conn_col_names):
Metadata_col = Metadata_cols_by_name[colname]
conn_col = conn_table.c[colname]
if not autogen_context.run_filters(
Metadata_col, colname, "column",
conn_col):
continue
alter_column_op = ops.AlterColumnop(
tname, schema=schema)
comparators.dispatch("column")(
autogen_context, alter_column_op,
schema, conn_col, Metadata_col
)
if alter_column_op.has_changes():
modify_table_ops.ops.append(alter_column_op)
yield
for cname in set(conn_col_names).difference(Metadata_col_names):
if autogen_context.run_filters(
conn_table.c[cname], True, None):
modify_table_ops.ops.append(
ops.DropColumnop.from_column_and_tablename(
schema, conn_table.c[cname]
)
)
log.info("Detected removed column '%s.%s'", cname)
def _compare_columns(schema, object_filters,
diffs, c) for c in conn_table.c)
Metadata_col_names = OrderedSet(sorted(Metadata_cols_by_name))
for cname in Metadata_col_names.difference(conn_col_names):
if _run_filters(Metadata_cols_by_name[cname],
"column", None, object_filters):
diffs.append(
("add_column", schema, cname)
for cname in set(conn_col_names).difference(Metadata_col_names):
rem_col = sa_schema.Column(
cname,
conn_table.c[cname].type,
nullable=conn_table.c[cname].nullable,
server_default=conn_table.c[cname].server_default
)
if _run_filters(rem_col, object_filters):
diffs.append(
("remove_column", rem_col)
)
log.info("Detected removed column '%s.%s'", cname)
for colname in Metadata_col_names.intersection(conn_col_names):
Metadata_col = Metadata_cols_by_name[colname]
conn_col = conn_table.c[colname]
if not _run_filters(
Metadata_col, object_filters):
continue
col_diff = []
_compare_type(schema,
conn_col,
Metadata_col,
col_diff, autogen_context
)
_compare_nullable(schema,
Metadata_col.nullable, autogen_context
)
_compare_server_default(schema, autogen_context
)
if col_diff:
diffs.append(col_diff)
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _compare_columns(schema, autogen_context
)
if col_diff:
diffs.append(col_diff)
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _compare_columns(schema,
diffs,
"column", cname)
for colname in Metadata_col_names.intersection(conn_col_names):
Metadata_col = Metadata_cols_by_name[colname]
conn_col = conn_table.c[colname]
if not _run_filters(
Metadata_col,
conn_col,
conn_col,
Metadata_col,
col_diff, autogen_context
)
# work around sqlAlchemy issue #3023
if not Metadata_col.primary_key:
_compare_nullable(schema,
conn_col,
Metadata_col.nullable,
col_diff, autogen_context
)
_compare_server_default(schema,
conn_col,
Metadata_col,
col_diff, autogen_context
)
if col_diff:
diffs.append(col_diff)
yield
for cname in set(conn_col_names).difference(Metadata_col_names):
if _run_filters(conn_table.c[cname], conn_table.c[cname])
)
log.info("Detected removed column '%s.%s'", cname)
def _revision_map(self):
"""memoized attribute,initializes the revision map from the
initial collection.
"""
map_ = {}
heads = sqlautil.OrderedSet()
_real_heads = sqlautil.OrderedSet()
self.bases = ()
self._real_bases = ()
has_branch_labels = set()
for revision in self._generator():
if revision.revision in map_:
util.warn("Revision %s is present more than once" %
revision.revision)
map_[revision.revision] = revision
if revision.branch_labels:
has_branch_labels.add(revision)
heads.add(revision.revision)
_real_heads.add(revision.revision)
if revision.is_base:
self.bases += (revision.revision, )
for rev in map_.values():
for downrev in rev._all_down_revisions:
if downrev not in map_:
util.warn("Revision %s referenced from %s is not present"
% (downrev, map_)
return map_
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
def _revision_map(self):
"""memoized attribute, map_branch_labels=False)
return map_
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。