Python sqlalchemy 模块,func() 实例源码
我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用sqlalchemy.func()。
def _convertFieldOrFunction(self, fieldOrFunction, preferValue=False):
"""
Convert a string to a column reference,or a dictionary to a column or
function reference. If a function is passed,this should be a
canonical function reference ('func' and 'param' are both populated).
:param fieldOrFunction: a string with a column name or a dictionary
with either a field,function,or value.
:param preferValue: if True then if fieldOrFunction is not a
dictionary,return it unchanged.
:returns: a constructed column or function object,or a bare value.
"""
if not isinstance(fieldOrFunction, dict):
if preferValue:
return fieldOrFunction
return getattr(self.tableClass, fieldOrFunction)
if 'field' in fieldOrFunction:
return getattr(self.tableClass, fieldOrFunction['field'])
if 'value' in fieldOrFunction:
if not preferValue:
return sqlalchemy.sql.elements.literal(
fieldOrFunction['value'])
return fieldOrFunction['value']
fieldOrFunction = self.isFunction(fieldOrFunction)
if fieldOrFunction is False:
raise DatabaseConnectorException('Not a function')
if not self._isFunctionAllowed(fieldOrFunction['func']):
raise DatabaseConnectorException('Function %s is not allowed' %
fieldOrFunction['func'])
param = fieldOrFunction.get('param', fieldOrFunction.get('params', []))
# Determine the function we need to call to apply the function
if fieldOrFunction['func'] in ('distinct', 'cast'):
if (fieldOrFunction['func'] == 'cast' and len(param) == 2 and
isinstance(param[1], dict) and 'value' in param[1]):
param[1]['value'] = self.types.get(param[1]['value'], param[1]['value'])
funcfunc = getattr(sqlalchemy, fieldOrFunction['func'])
else:
funcfunc = getattr(sqlalchemy.func, fieldOrFunction['func'])
return funcfunc(
*[self._convertFieldOrFunction(entry, True) for entry in param])
def make_shell_context():
from pprint import pprint
from flask_sqlalchemy import get_debug_queries
from sqlalchemy import desc, func
from scrobbler.models import Album, Artist, NowPlaying, Scrobble, Session, Token, User
return dict(
app=app, db=db, pprint=pprint, gq=get_debug_queries, func=func, desc=desc,
User=User, Session=Session, Token=Token,
Scrobble=Scrobble, NowPlaying=NowPlaying,
Artist=Artist, Album=Album,
)
def tag_show(context, data_dict):
'''Return the details of a tag and all its datasets.
:param id: the name or id of the tag
:type id: string
:param vocabulary_id: the id or name of the tag vocabulary that the tag is
in - if it is not specified it will assume it is a free tag.
(optional)
:type vocabulary_id: string
:param include_datasets: include a list of the tag's datasets. (Up to a
limit of 1000 - for more flexibility,use package_search - see
:py:func:`package_search` for an example.)
(optional,default: ``False``)
:type include_datasets: bool
:returns: the details of the tag,including a list of all of the tag's
datasets and their details
:rtype: dictionary
'''
model = context['model']
id = _get_or_bust(data_dict, 'id')
include_datasets = asbool(data_dict.get('include_datasets', False))
tag = model.Tag.get(id, vocab_id_or_name=data_dict.get('vocabulary_id'))
context['tag'] = tag
if tag is None:
raise NotFound
_check_access('tag_show', context, data_dict)
return model_dictize.tag_dictize(tag,
include_datasets=include_datasets)
def config_option_show(context, data_dict):
'''Show the current value of a particular configuration option.
Only returns runtime-editable config options (the ones returned by
:py:func:`~ckan.logic.action.get.config_option_list`),which can be updated with the
:py:func:`~ckan.logic.action.update.config_option_update` action.
:param key: The configuration option key
:type key: string
:returns: The value of the config option from either the system_info table
or ini file.
:rtype: string
:raises: :class:`ckan.logic.ValidationError`: if config option is not in
the schema (whitelisted as editable).
'''
_check_access('config_option_show', data_dict)
key = _get_or_bust(data_dict, 'key')
schema = ckan.logic.schema.update_configuration_schema()
# Only return whitelisted keys
if key not in schema:
raise ValidationError(
'Configuration option \'{0}\' can not be shown'.format(key))
# return the value from config
return config.get(key, None)
def config_option_list(context, data_dict):
'''Return a list of runtime-editable config options keys that can be
updated with :py:func:`~ckan.logic.action.update.config_option_update`.
:returns: A list of config option keys.
:rtype: list
'''
_check_access('config_option_list', data_dict)
schema = ckan.logic.schema.update_configuration_schema()
return schema.keys()
def Any(other, arrexpr, operator=operators.eq):
"""A synonym for the :meth:`.ARRAY.Comparator.any` method.
This method is legacy and is here for backwards-compatibility.
.. seealso::
:func:`.expression.any_`
"""
return arrexpr.any(other, operator)
def All(other, operator=operators.eq):
"""A synonym for the :meth:`.ARRAY.Comparator.all` method.
This method is legacy and is here for backwards-compatibility.
.. seealso::
:func:`.expression.all_`
"""
return arrexpr.all(other, operator)
def any(self, other, operator=operators.eq):
"""Return ``other operator ANY (array)`` clause.
Argument places are switched,because ANY requires array
expression to be on the right hand-side.
E.g.::
from sqlalchemy.sql import operators
conn.execute(
select([table.c.data]).where(
table.c.data.any(7,operator=operators.lt)
)
)
:param other: expression to be compared
:param operator: an operator object from the
:mod:`sqlalchemy.sql.operators`
package,defaults to :func:`.operators.eq`.
.. seealso::
:class:`.postgresql.Any`
:meth:`.postgresql.ARRAY.Comparator.all`
"""
return Any(other, self.expr, operator=operator)
def all(self, operator=operators.eq):
"""Return ``other operator ALL (array)`` clause.
Argument places are switched,because ALL requires array
expression to be on the right hand-side.
E.g.::
from sqlalchemy.sql import operators
conn.execute(
select([table.c.data]).where(
table.c.data.all(7,defaults to :func:`.operators.eq`.
.. seealso::
:class:`.postgresql.All`
:meth:`.postgresql.ARRAY.Comparator.any`
"""
return All(other, operator=operator)
def visit_substring_func(self, func, **kw):
s = self.process(func.clauses.clauses[0], **kw)
start = self.process(func.clauses.clauses[1], **kw)
if len(func.clauses.clauses) > 2:
length = self.process(func.clauses.clauses[2], **kw)
return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
else:
return "SUBSTRING(%s FROM %s)" % (s, start)
def journal_read(session, func):
"""Read,process and delete (if successful) the oldest journal row.
The row is locked on read,and remains locked until the processing
succeeds or gives up. This is to ensure that (in a multithreaded
or multiprocess environment) only one worker processes the update,
which in turn ensures monotonicity."""
# Todo(ijw): start a transaction here
# Find and lock the oldest journal entry
# NB this will serialise on locking the oldest record if there are
# multiple threads running (as there may be if multiple processes
# are running)
maybe_more=True
with session.begin():
# Note also that this will,if the other thread succeeds,lock a
# row that someone else deletes,so its session will abort.
entry = session.query(VppEtcdJournal) \
.order_by(VppEtcdJournal.id) \
.with_for_update() \
.first()
if entry:
if func(entry.k, entry.v): # ... can quite reasonably fail...
# Once done,it should go.
session.delete(entry)
else:
# For some reason,we can't do the job.
entry.retries = entry.retries + 1
session.update(entry)
else:
# The table is empty - no work available.
maybe_more=False
def package_relationships_list(context, data_dict):
'''Return a dataset (package)'s relationships.
:param id: the id or name of the first package
:type id: string
:param id2: the id or name of the second package
:type id: string
:param rel: relationship as string see
:py:func:`~ckan.logic.action.create.package_relationship_create` for
the relationship types (optional)
:rtype: list of dictionaries
'''
##Todo needs to work with dictization layer
model = context['model']
api = context.get('api_version')
id = _get_or_bust(data_dict, "id")
id2 = data_dict.get("id2")
rel = data_dict.get("rel")
ref_package_by = 'id' if api == 2 else 'name'
pkg1 = model.Package.get(id)
pkg2 = None
if not pkg1:
raise NotFound('First package named in request was not found.')
if id2:
pkg2 = model.Package.get(id2)
if not pkg2:
raise NotFound('Second package named in address was not found.')
if rel == 'relationships':
rel = None
_check_access('package_relationships_list', data_dict)
# Todo: How to handle this object level authz?
# Currently we don't care
relationships = pkg1.get_relationships(with_package=pkg2, type=rel)
if rel and not relationships:
raise NotFound('Relationship "%s %s %s" not found.'
% (id, rel, id2))
relationship_dicts = [
rel.as_dict(pkg1, ref_package_by=ref_package_by)
for rel in relationships]
return relationship_dicts
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。