Python sqlalchemy 模块,exc() 实例源码
我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用sqlalchemy.exc()。
def _retry_on_exceptions(exc):
if not isinstance(exc, exception.DBError):
return False
inn_e = exc.inner_exception
if not isinstance(inn_e, sqlalchemy.exc.InternalError):
return False
return ((
pyMysqL and
isinstance(inn_e.orig, pyMysqL.err.InternalError) and
(inn_e.orig.args[0] == pyMysqL.constants.ER.TABLE_DEF_CHANGED)
) or (
# HACK(jd) Sometimes,Postgresql raises an error such as "current
# transaction is aborted,commands ignored until end of transaction
# block" on its own catalog,so we need to retry,but this is not
# caught by oslo.db as a deadlock. This is likely because when we use
# Base.Metadata.create_all(),sqlalchemy itself gets an error it does
# not catch or something. So this is why this function exists. To
# paperover I guess.
psycopg2
and isinstance(inn_e.orig, psycopg2.InternalError)
# current transaction is aborted
and inn_e.orig.pgcode == '25P02'
))
def create_lock(context, lock_key, session):
"""Try to create lock record."""
with session.begin():
existing = session.query(models.ArtifactLock).get(lock_key)
if existing is None:
try:
lock = models.ArtifactLock()
lock.id = lock_key
lock.save(session=session)
return lock.id
except (sqlalchemy.exc.IntegrityError,
db_exception.DBDuplicateEntry):
msg = _("Cannot lock an item with key %s. "
"Lock already acquired by other request") % lock_key
raise exception.Conflict(msg)
else:
if timeutils.is_older_than(existing.acquired_at, 5):
existing.acquired_at = timeutils.utcNow()
existing.save(session)
return existing.id
else:
msg = _("Cannot lock an item with key %s. "
"Lock already acquired by other request") % lock_key
raise exception.Conflict(msg)
def _pre_upgrade_001(self, engine):
self.assertRaises(sqlalchemy.exc.NoSuchTableError,
db_utils.get_table, engine,
'glare_artifacts')
self.assertRaises(sqlalchemy.exc.NoSuchTableError,
'glare_artifact_tags')
self.assertRaises(sqlalchemy.exc.NoSuchTableError,
'glare_artifact_properties')
self.assertRaises(sqlalchemy.exc.NoSuchTableError,
'glare_artifact_blobs')
self.assertRaises(sqlalchemy.exc.NoSuchTableError,
'glare_artifact_locks')
def configure_authentication(self):
"""Set up authentication and authorization policies.
For more information see Pyramid auth documentation.
"""
import pyramid.tweens
from websauna.system.auth.principals import resolve_principals
from websauna.system.auth.authentication import get_request_user
from pyramid.authorization import ACLAuthorizationPolicy
from websauna.system.auth.policy import SessionAuthenticationPolicy
authn_policy = SessionAuthenticationPolicy(callback=resolve_principals)
authz_policy = ACLAuthorizationPolicy()
self.config.set_authentication_policy(authn_policy)
self.config.set_authorization_policy(authz_policy)
#self.config.add_tween("websauna.system.auth.tweens.SessionInvalidationTweenFactory",over=pyramid.tweens.MAIN)
# We need to carefully be above TM view,but below exc view so that internal server error page doesn't trigger session authentication that accesses the database
self.config.add_tween("websauna.system.auth.tweens.SessionInvalidationTweenFactory", under="pyramid_tm.tm_tween_factory")
# Grab incoming auth details changed events
from websauna.system.auth import subscribers
self.config.scan(subscribers)
# Experimental support for transaction aware properties
try:
from pyramid_tm.reify import transaction_aware_reify
self.config.add_request_method(
callable=transaction_aware_reify(self.config, get_request_user),
name="user",
property=True,
reify=False)
except ImportError:
self.config.add_request_method(get_request_user, 'user', reify=True)
def sanity_check(self):
"""Perform post-initialization sanity checks.
This is run on every startup to check that the database table schema matches our model deFinitions. If there are un-run migrations this will bail out and do not let the problem to escalate later.
See also: :ref:`websauna.sanity_check`.
"""
import sqlalchemy.exc
from websauna.system.model import sanitycheck
from websauna.system.model.Meta import Base
from websauna.system.model.Meta import create_dbsession
from websauna.system.core import redis
dbsession = create_dbsession(self.config.registry)
db_connection_string = self.config.registry.settings.get("sqlalchemy.url")
try:
if not sanitycheck.is_sane_database(Base, dbsession):
raise SanityCheckFailed("The database sanity check Failed. Check log for details.")
except sqlalchemy.exc.OperationalError as e:
raise SanityCheckFailed("The database {} is not responding.\nMake sure the database is running on your local computer or correctly configured in settings INI file.\nFor more information see https://websauna.org/docs/tutorials/gettingstarted/tutorial_02.html.".format(db_connection_string)) from e
dbsession.close()
if self._has_redis_sessions:
if not redis.is_sane_redis(self.config):
raise SanityCheckFailed("Could not connect to Redis server.\nWebsauna is configured to use Redis server for session data.\nIt cannot start up without a running Redis server.\nPlease consult your operating system community how to install and start a Redis server.")
def del_rse(rse, session=None):
"""
disable a rse with the given rse name.
:param rse: the rse name.
:param session: The database session in use.
"""
try:
old_rse = session.query(models.RSE).filter_by(rse=rse).one()
except sqlalchemy.orm.exc.noresultFound:
raise exception.RSENotFound('RSE \'%s\' cannot be found' % rse)
old_rse.delete(session=session)
del_rse_attribute(rse=rse, key=rse, session=session)
def get_rse(rse, rse_id=None, session=None):
"""
Get a RSE or raise if it does not exist.
:param rse: The rse name.
:param rse_id: The rse id. To be used if the rse parameter is none.
:param session: The database session in use.
:raises RSENotFound: If referred RSE was not found in the database.
"""
false_value = False # To make pep8 checker happy ...
try:
if rse:
tmp = session.query(models.RSE).\
filter(sqlalchemy.and_(models.RSE.deleted == false_value,
models.RSE.rse == rse))\
.one()
else:
tmp = session.query(models.RSE).\
filter(sqlalchemy.and_(models.RSE.deleted == false_value,
models.RSE.id == rse_id))\
.one()
tmp['type'] = tmp.rse_type
return tmp
except sqlalchemy.orm.exc.noresultFound:
raise exception.RSENotFound('RSE \'%s\' cannot be found' % rse)
def get_rse_id(rse, session=None):
"""
Get a RSE ID or raise if it does not exist.
:param rse: the rse name.
:param session: The database session in use.
:returns: The rse id.
:raises RSENotFound: If referred RSE was not found in the database.
"""
try:
return session.query(models.RSE.id).filter_by(rse=rse).one()[0]
except sqlalchemy.orm.exc.noresultFound:
raise exception.RSENotFound('RSE \'%s\' cannot be found' % rse)
def get_rse_name(rse_id, session=None):
"""
Get a RSE name or raise if it does not exist.
:param rse_id: the rse uuid from the database.
:param session: The database session in use.
:returns: The rse name.
:raises RSENotFound: If referred RSE was not found in the database.
"""
try:
return session.query(models.RSE.rse).filter_by(id=rse_id).one()[0]
except sqlalchemy.orm.exc.noresultFound:
raise exception.RSENotFound('RSE with ID \'%s\' cannot be found' % rse_id)
def update_rse(rse, parameters, session=None):
"""
Update RSE properties like availability or name.
:param rse: the name of the new rse.
:param parameters: A dictionnary with property (name,read,write,delete as keys).
:param session: The database session in use.
:raises RSENotFound: If RSE is not found.
"""
try:
query = session.query(models.RSE).filter_by(rse=rse).one()
except sqlalchemy.orm.exc.noresultFound:
raise exception.RSENotFound('RSE \'%s\' cannot be found' % rse)
rse_id = query.id
availability = 0
for column in query:
if column[0] == 'availability':
availability = column[1] or availability
param = {}
availability_mapping = {'availability_read': 4, 'availability_write': 2, 'availability_delete': 1}
for key in parameters:
if key == 'name':
param['rse'] = parameters['name']
if key in ['availability_read', 'availability_write', 'availability_delete']:
if parameters[key] is True:
availability = availability | availability_mapping[key]
else:
availability = availability & ~availability_mapping[key]
param['availability'] = availability
query.update(param)
if 'name' in parameters:
add_rse_attribute(rse=parameters['name'], key=parameters['name'], value=1, session=session)
query = session.query(models.RSEAttrAssociation).filter_by(rse_id=rse_id).filter(models.RSEAttrAssociation.key == rse)
rse_attr = query.one()
rse_attr.delete(session=session)
def _retry_on_deadlock(exc):
"""Decorator to retry a DB API call if Deadlock was received."""
if isinstance(exc, db_exception.DBDeadlock):
LOG.warning("Deadlock detected. retrying...")
return True
return False
def get_blob_data(context, uri, session):
"""Download blob data from database."""
blob_data_id = uri[6:]
try:
blob_data = session.query(
models.ArtifactBlobData).filter_by(id=blob_data_id).one()
except orm.exc.noresultFound:
msg = _("Cannot find a blob data with id %s.") % blob_data_id
raise exception.NotFound(msg)
return blob_data.data
def assertTableNotExists(self, table)
def load_tweets():
"""Load tweet data from scrapped twitter data file into database"""
# Tweet.query.delete()
for row in open("missing_dates_2.txt"):
row = row.rstrip()
tweet_data = row.split("|")
handle = tweet_data[0]
# Foreign key to users table
user_id = User.query.filter(User.handle == handle).first()
# Converting UTC timestamp
timestamp = datetime.datetime.fromtimestamp(float(tweet_data[3]))
# For tweets that don't have location data
tweet_data[4] = tweet_data[4] or None
tweet_data[5] = tweet_data[5] or None
# Removing URLs from the tweet
clean_tweet = re.sub(r"http\S+", "", tweet_data[2])
nb_classification = run_classifier([clean_tweet])
# Returns Trump/Clinton/Both for sorting later
candidate = parsing_candidates(clean_tweet)
if candidate:
try:
tweet = Tweet(user_id=user_id.user_id,
tweet_id=tweet_data[1],
text=clean_tweet,
timestamp=timestamp,
profile_location=tweet_data[4],
place_id=tweet_data[5],
naive_bayes=nb_classification[0],
referenced_candidate=candidate)
db.session.add(tweet)
db.session.flush()
db.session.commit()
print "Tweet added: {},{}".format(tweet.tweet_id, tweet.timestamp)
except sqlalchemy.exc.IntegrityError:
print "******flush or integrity error,rolling back! : {}, tweet.timestamp)
# Preventing duplicate tweets from accidentally being added
db.session.rollback()
continue
################################################################################
# Need to actually create the keyword file!
def post_users():
"""
:param email: required,user's email,must be unique
:param password: required,password hash
:param salt: required,sent by client
:param phone_id: required,phone id for client,may not be unique
:param state: required,two letter string containing state code/abbrev.
:param subbed: optional,whether user is subbed or not,defaults false
:param alt_email: optional,user's alternate email,must be unique
:returns: unique generated user token
"""
try:
phone_os = str(g.req.data.get('phone_os')).lower()
if phone_os not in SUPPORTED_PHONE_OS:
raise InvalidUsage("\"{}\" is not currently supported.".format(phone_os))
except ValueError:
raise InvalidUsage("\"phone_os\" must be a string.")
user = User(email = g.req.data.get('email'),
password = g.req.data.get('password'),
salt = g.req.data.get('salt'),
phone_id = g.req.data.get('phone_id'),
phone_os = g.req.data.get('phone_os').lower(),
subbed = g.req.data.get('subbed', False),
state = g.req.data.get('state'),
token = generate_user_token(),
alt_email = g.req.data.get('alt_email'))
db.session.add(user)
try:
db.session.commit()
except sqlalchemy.exc.OperationalError as e:
print_log(e)
db.session.rollback()
raise InvalidUsage(message="Internal configuration error.",
status_code=500)
except sqlalchemy.exc.IntegrityError as e:
print_log(e)
err = "Provided primary/alternative e-mail already exists."
db.session.rollback()
raise InvalidUsage(message=err)
customer = stripe.Customer.create(description="Customer for {}".format(user.email),
email=user.email)
user.stripe_cus_id = customer.id
db.session.add(user)
db.session.commit()
g.res.message = "Added new user with e-mail \"{}\".".format(user.email)
g.res.update_data({'user': {'token': user.token, 'stripe_id': user.stripe_cus_id}})
g.res.status_code = 201
return jsonify(g.res)
# PATCH /users/
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。