Python sqlalchemy 模块,join() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.join()。
def __init__(self, left, right, onclause=None, isouter=False):
"""Construct a new :class:`.Join`.
The usual entrypoint here is the :func:`~.expression.join`
function or the :meth:`.FromClause.join` method of any
:class:`.FromClause` object.
"""
self.left = _interpret_as_from(left)
self.right = _interpret_as_from(right).self_group()
if onclause is None:
self.onclause = self._match_primaries(self.left, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def join(self, isouter=False,
base=None, **mapper_args):
"""Create an :func:`.expression.join` and map to it.
The class and its mapping are not cached and will
be discarded once dereferenced (as of 0.6.6).
:param left: a mapped class or table object.
:param right: a mapped class or table object.
:param onclause: optional "ON" clause construct..
:param isouter: if True,the join will be an OUTER join.
:param base: a Python class which will be used as the
base for the mapped class. If ``None``,the "base"
argument specified by this :class:`.sqlSoup`
instance's constructor will be used,which defaults to
``object``.
:param mapper_args: Dictionary of arguments which will
be passed directly to :func:`.orm.mapper`.
"""
j = join(left, onclause=onclause, isouter=isouter)
return self.map(j, base=base, **mapper_args)
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def _create_outerjoin(cls, onclause=None):
"""Return an ``OUTER JOIN`` clause element.
The returned object is an instance of :class:`.Join`.
Similar functionality is also available via the
:meth:`~.FromClause.outerjoin()` method on any
:class:`.FromClause`.
:param left: The left side of the join.
:param right: The right side of the join.
:param onclause: Optional criterion for the ``ON`` clause,is
derived from foreign key relationships established between
left and right otherwise.
To chain joins together,use the :meth:`.FromClause.join` or
:meth:`.FromClause.outerjoin` methods on the resulting
:class:`.Join` object.
"""
return cls(left, onclause, isouter=True)
def largest_groups(cls, limit=10):
member = table('member')
package = table('package')
j = join(member, package,
member.c.table_id == package.c.id)
s = select([member.c.group_id, func.count(member.c.table_id)]).\
select_from(j).\
group_by(member.c.group_id).\
where(and_(member.c.group_id!=None, member.c.table_name=='package', package.c.private==False, package.c.state=='active')).\
order_by(func.count(member.c.table_id).desc()).\
limit(limit)
res_ids = model.Session.execute(s).fetchall()
res_groups = [(model.Session.query(model.Group).get(unicode(group_id)), val) for group_id, val in res_ids]
return res_groups
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, full=False):
"""Construct a new :class:`.Join`.
The usual entrypoint here is the :func:`~.expression.join`
function or the :meth:`.FromClause.join` method of any
:class:`.FromClause` object.
"""
self.left = _interpret_as_from(left)
self.right = _interpret_as_from(right).self_group()
if onclause is None:
self.onclause = self._match_primaries(self.left, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def process_movie_languages(self, document, movieelement, movie):
# build query for movie languages processing
languagecolumns = [db.Metadata.tables['movie_lang'].c.movie_id,
db.Metadata.tables['movie_lang'].c.type,
db.Metadata.tables['movie_lang'].c.lang_id,
db.Metadata.tables['movie_lang'].c.acodec_id,
db.Metadata.tables['languages'].c.name,
db.Metadata.tables['acodecs'].c.name]
language_join = join(db.Metadata.tables['movie_lang'], \
db.Metadata.tables['languages'], \
db.Metadata.tables['movie_lang'].c.lang_id==db.Metadata.tables['languages'].c.lang_id)
acodec_join = language_join.join( \
db.Metadata.tables['acodecs'], \
db.Metadata.tables['movie_lang'].c.acodec_id==db.Metadata.tables['acodecs'].c.acodec_id)
languagesquery = select(\
bind=self.db.session.bind, \
columns = languagecolumns, \
from_obj = [language_join, acodec_join], \
whereclause = and_(db.Metadata.tables['movie_lang'].c.movie_id==movie['movies_movie_id'], \
or_(db.Metadata.tables['movie_lang'].c.type==0, or_(db.Metadata.tables['movie_lang'].c.type==2, db.Metadata.tables['movie_lang'].c.type==None))), \
use_labels = True)
self.process_languages(document, languagesquery)
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
def __init__(self, self.right)
else:
self.onclause = onclause
self.isouter = isouter
self.full = full
def prefix_with(self, *expr, **kw):
"""Add one or more expressions following the statement keyword,i.e.
SELECT,INSERT,UPDATE,or DELETE. Generative.
This is used to support backend-specific prefix keywords such as those
provided by MysqL.
E.g.::
stmt = table.insert().prefix_with("LOW_PRIORITY",dialect="MysqL")
Multiple prefixes can be specified by multiple calls
to :meth:`.prefix_with`.
:param \*expr: textual or :class:`.ClauseElement` construct which
will be rendered following the INSERT,or DELETE
keyword.
:param \**kw: A single keyword 'dialect' is accepted. This is an
optional string dialect name which will
limit rendering of this prefix to only that dialect.
"""
dialect = kw.pop('dialect', None)
if kw:
raise exc.ArgumentError("Unsupported argument(s): %s" %
",".join(kw))
self._setup_prefixes(expr, dialect)
def suffix_with(self, **kw):
"""Add one or more expressions following the statement as a whole.
This is used to support backend-specific suffix keywords on
certain constructs.
E.g.::
stmt = select([col1,col2]).cte().suffix_with(
"cycle empno set y_cycle to 1 default 0",dialect="oracle")
Multiple suffixes can be specified by multiple calls
to :meth:`.suffix_with`.
:param \*expr: textual or :class:`.ClauseElement` construct which
will be rendered following the target clause.
:param \**kw: A single keyword 'dialect' is accepted. This is an
optional string dialect name which will
limit rendering of this suffix to only that dialect.
"""
dialect = kw.pop('dialect',".join(kw))
self._setup_suffixes(expr, dialect)
def join(self, isouter=False):
"""Return a :class:`.Join` from this :class:`.FromClause`
to another :class:`FromClause`.
E.g.::
from sqlalchemy import join
j = user_table.join(address_table,
user_table.c.id == address_table.c.user_id)
stmt = select([user_table]).select_from(j)
would emit sql along the lines of::
SELECT user.id,user.name FROM user
JOIN address ON user.id = address.user_id
:param right: the right side of the join; this is any
:class:`.FromClause` object such as a :class:`.Table` object,and
may also be a selectable-compatible object such as an ORM-mapped
class.
:param onclause: a sql expression representing the ON clause of the
join. If left at ``None``,:meth:`.FromClause.join` will attempt to
join the two tables based on a foreign key relationship.
:param isouter: if True,render a LEFT OUTER JOIN,instead of JOIN.
.. seealso::
:func:`.join` - standalone function
:class:`.Join` - the type of object produced
"""
return Join(self, isouter)
def outerjoin(self, onclause=None):
"""Return a :class:`.Join` from this :class:`.FromClause`
to another :class:`FromClause`,with the "isouter" flag set to
True.
E.g.::
from sqlalchemy import outerjoin
j = user_table.outerjoin(address_table,
user_table.c.id == address_table.c.user_id)
The above is equivalent to::
j = user_table.join(
address_table,
user_table.c.id == address_table.c.user_id,
isouter=True)
:param right: the right side of the join; this is any
:class:`.FromClause` object such as a :class:`.Table` object,:meth:`.FromClause.join` will attempt to
join the two tables based on a foreign key relationship.
.. seealso::
:meth:`.FromClause.join`
:class:`.Join`
"""
return Join(self, True)
def _create_join(cls, isouter=False):
"""Produce a :class:`.Join` object,given two :class:`.FromClause`
expressions.
E.g.::
j = join(user_table,address_table,
user_table.c.id == address_table.c.user_id)
stmt = select([user_table]).select_from(j)
would emit sql along the lines of::
SELECT user.id,user.name FROM user
JOIN address ON user.id = address.user_id
Similar functionality is available given any
:class:`.FromClause` object (e.g. such as a :class:`.Table`) using
the :meth:`.FromClause.join` method.
:param left: The left side of the join.
:param right: the right side of the join; this is any
:class:`.FromClause` object such as a :class:`.Table` object,instead of JOIN.
.. seealso::
:meth:`.FromClause.join` - method form,based on a given left side
:class:`.Join` - the type of object produced
"""
return cls(left, isouter)
def _joincond_trim_constraints(
cls, a, b, constraints, consider_as_foreign_keys):
# more than one constraint matched. narrow down the list
# to include just those FKCs that match exactly to
# "consider_as_foreign_keys".
if consider_as_foreign_keys:
for const in list(constraints):
if set(f.parent for f in const.elements) != set(
consider_as_foreign_keys):
del constraints[const]
# if still multiple constraints,but
# they all refer to the exact same end result,use it.
if len(constraints) > 1:
dedupe = set(tuple(crit) for crit in constraints.values())
if len(dedupe) == 1:
key = list(constraints)[0]
constraints = {key: constraints[key]}
if len(constraints) != 1:
raise exc.AmbiguousForeignKeysError(
"Can't determine join between '%s' and '%s'; "
"tables have more than one foreign key "
"constraint relationship between them. "
"Please specify the 'onclause' of this "
"join explicitly." % (a.description, b.description))
def select_from(self, fromclause):
"""return a new :func:`.select` construct with the
given FROM expression
merged into its list of FROM objects.
E.g.::
table1 = table('t1',column('a'))
table2 = table('t2',column('b'))
s = select([table1.c.a]).\\
select_from(
table1.join(table2,table1.c.a==table2.c.b)
)
The "from" list is a unique set on the identity of each element,
so adding an already present :class:`.Table` or other selectable
will have no effect. Passing a :class:`.Join` that refers
to an already present :class:`.Table` or other selectable will have
the effect of concealing the presence of that selectable as
an individual element in the rendered FROM list,instead
rendering it into a JOIN clause.
While the typical purpose of :meth:`.Select.select_from` is to
replace the default,derived FROM clause with a join,it can
also be called with individual table elements,multiple times
if desired,in the case that the FROM clause cannot be fully
derived from the columns clause::
select([func.count('*')]).select_from(table1)
"""
self.append_from(fromclause)
def select_from(self,in the case that the FROM clause cannot be fully
derived from the columns clause::
select([func.count('*')]).select_from(table1)
"""
self.append_from(fromclause)
def _selectable_name(selectable):
if isinstance(selectable, sql.Alias):
return _selectable_name(selectable.element)
elif isinstance(selectable, sql.Select):
return ''.join(_selectable_name(s) for s in selectable.froms)
elif isinstance(selectable, schema.Table):
return selectable.name.capitalize()
else:
x = selectable.__class__.__name__
if x[0] == '_':
x = x[1:]
return x
def select_from(self,in the case that the FROM clause cannot be fully
derived from the columns clause::
select([func.count('*')]).select_from(table1)
"""
self.append_from(fromclause)
def top_rated_packages(cls, limit=10):
# NB Not using sqlalchemy as sqla 0.4 doesn't work using both group_by
# and apply_avg
package = table('package')
rating = table('rating')
sql = select([package.c.id, func.avg(rating.c.rating), func.count(rating.c.rating)], from_obj=[package.join(rating)]).\
where(and_(package.c.private==False, package.c.state=='active')). \
group_by(package.c.id).\
order_by(func.avg(rating.c.rating).desc(), func.count(rating.c.rating).desc()).\
limit(limit)
res_ids = model.Session.execute(sql).fetchall()
res_pkgs = [(model.Session.query(model.Package).get(unicode(pkg_id)), avg, num) for pkg_id, num in res_ids]
return res_pkgs
def most_edited_packages(cls, limit=10):
package_revision = table('package_revision')
package = table('package')
s = select([package_revision.c.id, func.count(package_revision.c.revision_id)], from_obj=[package_revision.join(package)]).\
where(and_(package.c.private==False, package.c.state=='active', )).\
group_by(package_revision.c.id).\
order_by(func.count(package_revision.c.revision_id).desc()).\
limit(limit)
res_ids = model.Session.execute(s).fetchall()
res_pkgs = [(model.Session.query(model.Package).get(unicode(pkg_id)), val) for pkg_id, val in res_ids]
return res_pkgs
def top_tags(cls, limit=10, returned_tag_info='object'): # by package
assert returned_tag_info in ('name', 'id', 'object')
tag = table('tag')
package_tag = table('package_tag')
package = table('package')
if returned_tag_info == 'name':
from_obj = [package_tag.join(tag)]
tag_column = tag.c.name
else:
from_obj = None
tag_column = package_tag.c.tag_id
j = join(package_tag,
package_tag.c.package_id == package.c.id)
s = select([tag_column, func.count(package_tag.c.package_id)],
from_obj=from_obj).\
select_from(j).\
where(and_(package_tag.c.state=='active', package.c.private == False, package.c.state == 'active' ))
s = s.group_by(tag_column).\
order_by(func.count(package_tag.c.package_id).desc()).\
limit(limit)
res_col = model.Session.execute(s).fetchall()
if returned_tag_info in ('id', 'name'):
return res_col
elif returned_tag_info == 'object':
res_tags = [(model.Session.query(model.Tag).get(unicode(tag_id)), val) for tag_id, val in res_col]
return res_tags
def get_package_revisions(cls):
'''
@return: Returns list of revisions and date of them,in
format: [(id,date),...]
'''
package_revision = table('package_revision')
revision = table('revision')
s = select([package_revision.c.id, revision.c.timestamp], from_obj=[package_revision.join(revision)]).order_by(revision.c.timestamp)
res = model.Session.execute(s).fetchall() # [(id,datetime),...]
return res
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。