微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在金字塔网络应用程序中手动提交 sqlalchemy 数据库事务?

如何解决如何在金字塔网络应用程序中手动提交 sqlalchemy 数据库事务?

我有一个 Pyramid Web 应用程序,需要在向 sqlalchemy 数据库提交更改后运行 Celery 任务。我知道我可以使用 request.tm.get().addAfterCommitHook() 来做到这一点。但是,这对我不起作用,因为我还需要在视图中使用 celery 任务的 task_id。因此,我需要在对 Celery 任务调用 task.delay() 之前提交对数据库的更改。

zope.sqlalchemy 文档说我可以使用 transaction.commit() 手动提交。但是,这对我不起作用; celery 任务在更改提交到数据库之前运行,即使我在调用 task.delay() 之前调用了 transaction.commit()

我的金字塔视图代码如下所示:

ride=appstruct_to_ride(dbsession,appstruct)
dbsession.add(ride)

# Flush dbsession so ride gets an id assignment
dbsession.flush()

# Store ride id
ride_id=ride.id
log.info('Created ride {}'.format(ride_id))

# Commit ride to database
import transaction
transaction.commit()

# Queue a task to update ride's weather data
from ..processing.weather import update_ride_weather
update_weather_task=update_ride_weather.delay(ride_id)

url = self.request.route_url('rides')
return HTTPFound(
    url,content_type='application/json',charset='',text=json.dumps(
        {'ride_id':ride_id,'update_weather_task_id':update_weather_task.task_id}))

我的 celery 任务如下所示:

@celery.task(bind=True,ignore_result=False)
def update_ride_weather(self,ride_id,train_model=True):

    from ..celery import session_factory
    
    logger.debug('Received update weather task for ride {}'.format(ride_id))

    dbsession=session_factory()
    dbsession.expire_on_commit=False

    with transaction.manager:
        ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()

芹菜任务失败并显示 noresultFound:

  File "/app/cycling_data/processing/weather.py",line 478,in update_ride_weather
    ride=dbsession.query(Ride).filter(Ride.id==ride_id).one()
  File "/usr/local/lib/python3.8/site-packages/sqlalchemy/orm/query.py",line 3282,in one
    raise orm_exc.noresultFound("No row was found for one()")

当我事后检查数据库时,我看到该记录实际上是在 celery 任务运行并失败后创建的。所以这意味着 transaction.commit() 没有按预期提交事务,而是在视图返回后由 zope.sqlalchemy 机器自动提交更改。如何在我的视图代码中手动提交事务?

解决方法

request.tmpyramid_tm 定义,可以是线程本地 transaction.manager 对象或每个请求对象,具体取决于您如何配置 pyramid_tm(查找 { {1}} 在某处被定义以确定正在使用哪个。

你的问题很棘手,因为无论你做什么都应该符合 pyramid_tm.manager_hook 以及它期望事情如何运作。具体来说,它计划围绕请求的生命周期控制事务 - 尽早提交对于该事务来说不是一个好主意。 pyramid_tm 试图帮助提供故障安全功能,以便在请求生命周期中的任何地方发生任何故障时回滚整个请求 - 而不仅仅是在您的可调用视图中。

选项 1:

无论如何都要早点提交。如果您打算这样做,那么提交后的失败无法回滚已提交的数据,因此您可以部分提交请求。好的,好的,这就是您的问题,所以答案是使用 pyramid_tm 可能后跟 request.tm.commit() 来为任何后续更改开始一个新的。您还需要注意不要跨该边界共享 sqlalchemy 托管对象,例如 request.tm.begin() 等,因为它们需要刷新/合并到新事务中(SQLAlchemy 的身份缓存不能信任从不同事务加载的数据)默认情况下,因为这就是隔离级别的工作方式)。

选项 2:

为您想要提早提交的数据启动一个单独的事务。好的,假设您没有使用任何像 request.usertransaction.manager 这样的线程局部变量,那么您可能可以开始自己的事务并提交它,而无需触及由 { 控制的 scoped_session {1}}。一些适用于 pyramid-cookiecutter-starter 项目结构的通用代码可能是:

dbsession

这还不错 - 就故障模式而言,这可能是您可以做的最好的事情,而无需将 celery 连接到 pyramid_tm 控制的 dbsession 中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。