Python sqlalchemy.util 模块,to_list() 实例源码
我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用sqlalchemy.util.to_list()。
def only_on(dbs, reason=None):
return only_if(
OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
)
def _compile(self, whereclause = None, **kwargs):
order_by = kwargs.pop('order_by', False)
if order_by is False:
order_by = self.order_by
if order_by is False:
if self.table.default_order_by() is not None:
order_by = self.table.default_order_by()
if self._should_nest(**kwargs):
s2 = sql.select(self.table.primary_key, whereclause, use_labels=True, **kwargs)
if not kwargs.get('distinct', False) and order_by:
s2.order_by(*util.to_list(order_by))
s3 = s2.alias('rowcount')
crit = []
for i in range(0, len(self.table.primary_key)):
crit.append(s3.primary_key[i] == self.table.primary_key[i])
statement = sql.select([], sql.and_(*crit), from_obj=[self.table], use_labels=True)
if order_by:
statement.order_by(*util.to_list(order_by))
else:
statement = sql.select([], **kwargs)
if order_by:
statement.order_by(*util.to_list(order_by))
# for a disTINCT query,you need the columns explicitly specified in order
# to use it in "order_by". insure they are in the column criterion (particularly oid).
# Todo: this should be done at the sql level not the mapper level
if kwargs.get('distinct', False) and order_by:
statement.append_column(*util.to_list(order_by))
# plugin point
# give all the attached properties a chance to modify the query
for key, value in self.props.iteritems():
value.setup(key, statement, **kwargs)
return statement
def setup(self, key, recursion_stack = None, eagertable=None, **options):
"""add a left outer join to the statement thats being constructed"""
if recursion_stack is None:
recursion_stack = {}
if statement.whereclause is not None:
# "aliasize" the tables referenced in the user-defined whereclause to not
# collide with the tables used by the eager load
# note that we arent affecting the mapper's table,nor our own primary or secondary joins
aliasizer = Aliasizer(*self.to_alias)
statement.whereclause.accept_visitor(aliasizer)
for alias in aliasizer.aliases.values():
statement.append_from(alias)
if hasattr(statement, '_outerjoin'):
towrap = statement._outerjoin
else:
towrap = self.parent.table
if self.secondaryjoin is not None:
statement._outerjoin = sql.outerjoin(towrap, self.secondary, self.eagerprimary).outerjoin(self.eagertarget, self.eagersecondary)
if self.order_by is False and self.secondary.default_order_by() is not None:
statement.order_by(*self.secondary.default_order_by())
else:
statement._outerjoin = towrap.outerjoin(self.eagertarget, self.eagerprimary)
if self.order_by is False and self.eagertarget.default_order_by() is not None:
statement.order_by(*self.eagertarget.default_order_by())
if self.eager_order_by:
statement.order_by(*util.to_list(self.eager_order_by))
statement.append_from(statement._outerjoin)
recursion_stack[self] = True
try:
for key, value in self.mapper.props.iteritems():
if recursion_stack.has_key(value):
raise "Circular eager load relationship detected on " + str(self.mapper) + " " + key + repr(self.mapper.props)
value.setup(key, recursion_stack=recursion_stack, eagertable=self.eagertarget)
finally:
del recursion_stack[self]
def only_on(dbs, reason=None):
return only_if(
OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([SpecPredicate(db) for db in util.to_list(dbs)])
)
def _filter_or_exclude(self, negate, kwargs):
q = self
def negate_if(expr):
return expr if not negate else ~expr
column = None
for arg, value in kwargs.items():
for token in arg.split('__'):
if column is None:
column = _entity_descriptor(q._joinpoint_zero(), token)
if column.impl.uses_objects:
q = q.join(column)
column = None
elif token in ['in']:
op = self._underscore_operators[token]
q = q.filter(negate_if(op(column, value)))
column = None
elif token in self._underscore_operators:
op = self._underscore_operators[token]
q = q.filter(negate_if(op(column, *to_list(value))))
column = None
else:
raise ValueError('No idea what to do with %r' % token)
if column is not None:
q = q.filter(negate_if(column == value))
column = None
q = q.reset_joinpoint()
return q
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def _filter_or_exclude(self, *to_list(value))))
column = None
else:
raise ValueError('No idea what to do with %r' % token)
if column is not None:
q = q.filter(negate_if(column == value))
column = None
q = q.reset_joinpoint()
return q
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def only_on(dbs, reason=None):
return only_if(
OrPredicate([Predicate.as_predicate(db) for db in util.to_list(dbs)])
)
def _possible_configs_for_cls(cls, reasons=None):
all_configs = set(config.Config.all_configs())
if cls.__unsupported_on__:
spec = exclusions.db_spec(*cls.__unsupported_on__)
for config_obj in list(all_configs):
if spec(config_obj):
all_configs.remove(config_obj)
if getattr(cls, '__only_on__', None):
spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
for config_obj in list(all_configs):
if not spec(config_obj):
all_configs.remove(config_obj)
if hasattr(cls, '__requires__'):
requirements = config.requirements
for config_obj in list(all_configs):
for requirement in cls.__requires__:
check = getattr(requirements, requirement)
skip_reasons = check.matching_config_reasons(config_obj)
if skip_reasons:
all_configs.remove(config_obj)
if reasons is not None:
reasons.extend(skip_reasons)
break
if hasattr(cls, '__prefer_requires__'):
non_preferred = set()
requirements = config.requirements
for config_obj in list(all_configs):
for requirement in cls.__prefer_requires__:
check = getattr(requirements, requirement)
if not check.enabled_for_config(config_obj):
non_preferred.add(config_obj)
if all_configs.difference(non_preferred):
all_configs.difference_update(non_preferred)
for db_spec, op, spec in getattr(cls, '__excluded_on__', ()):
for config_obj in list(all_configs):
if not exclusions.skip_if(
exclusions.SpecPredicate(db_spec, spec)
).enabled_for_config(config_obj):
all_configs.remove(config_obj)
return all_configs
def _do_skips(cls):
reasons = []
all_configs = _possible_configs_for_cls(cls, reasons)
if getattr(cls, '__skip_if__', False):
for c in getattr(cls, '__skip_if__'):
if c():
raise SkipTest("'%s' skipped by %s" % (
cls.__name__, c.__name__)
)
if not all_configs:
if getattr(cls, '__backend__', False):
msg = "'%s' unsupported for implementation '%s'" % (
cls.__name__, cls.__only_on__)
else:
msg = "'%s' unsupported on any DB implementation %s%s" % (
cls.__name__,
",".join(
"'%s(%s)+%s'" % (
config_obj.db.name,
".".join(
str(dig) for dig in
config_obj.db.dialect.server_version_info),
config_obj.db.driver
)
for config_obj in config.Config.all_configs()
),".join(reasons)
)
raise SkipTest(msg)
elif hasattr(cls, '__prefer_backends__'):
non_preferred = set()
spec = exclusions.db_spec(*util.to_list(cls.__prefer_backends__))
for config_obj in all_configs:
if not spec(config_obj):
non_preferred.add(config_obj)
if all_configs.difference(non_preferred):
all_configs.difference_update(non_preferred)
if config._current not in all_configs:
_setup_config(all_configs.pop(), cls)
def _do_skips(self, cls):
from sqlalchemy.testing import config
if hasattr(cls, '__requires__'):
def test_suite():
return 'ok'
test_suite.__name__ = cls.__name__
for requirement in cls.__requires__:
check = getattr(config.requirements, requirement)
if not check.enabled:
raise SkipTest(
check.reason if check.reason
else
(
"'%s' unsupported on DB implementation '%s'" % (
cls.__name__, config.db.name
)
)
)
if cls.__unsupported_on__:
spec = exclusions.db_spec(*cls.__unsupported_on__)
if spec(config.db):
raise SkipTest(
"'%s' unsupported on DB implementation '%s'" % (
cls.__name__, config.db.name)
)
if getattr(cls, None):
spec = exclusions.db_spec(*util.to_list(cls.__only_on__))
if not spec(config.db):
raise SkipTest(
"'%s' unsupported on DB implementation '%s'" % (
cls.__name__, False):
for c in getattr(cls, '__skip_if__'):
if c():
raise SkipTest("'%s' skipped by %s" % (
cls.__name__, c.__name__)
)
for db, ()):
exclusions.exclude(db, spec,
"'%s' unsupported on DB %s version %s" % (
cls.__name__, config.db.name,
exclusions._server_version(config.db)))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。