Python rq:处理作业的成功或失败

如何解决Python rq:处理作业的成功或失败

我在我的应用中设置了一个相当基本的(到目前为止)队列:

  • 作业 1 (backup):备份我将要替换的 sql
  • 作业 2 (update):执行实际的表删除/更新

非常简化的代码

from rq import Queue
from rq.decorators import job


@job('backup')
def backup(db,table,conn_str):
    backup_sql = "SELECT * INTO {}.dbo.{}_backup from {}.dbo.{}".format(db,db,collection)

@job('update')
def update(db,conn_str,keys,data):
    truncate_sql = "TruncATE TABLE {}.dbo.{}".format(db,collection)
    sql_cursor.execute(truncate_sql)
            
    for sql_row in data:
        sql = "INSERT INTO {}.dbo.{} ({}) values ({})".format(db,",".join(keys),".join(["?"] * len(sql_row)))
        sql_cursor.execute(sql,sql_row)
    sql_cursor.commit()

def update_data():
    ...
    update_queue = Queue('update',connection=redis_conn)

    backup_job = update_queue.enqueue('backup',result_ttl=current_app.config['RESULT_TTL'],)
    update_job = update_queue.enqueue('update',)


我想做的是找到一种观看update方法。如果失败,我想运行一个作业来恢复在 backup 作业中创建的备份。如果成功,我想运行不同的作业来简单地删除备份。

解决这个问题的正确方法是什么?我对 rq 很陌生,正在查看文档,但没有找到轮询 update 以了解成功/失败的方法,也没有找到处理任一结果的惯用方法

解决方法

一个选项是创建另一个名为“检查器”的第三个作业,例如,它将根据“更新”作业的状态决定要做什么。为此,您必须指定依赖关系。

depends_on 指定必须在之前完成的另一个作业(或作业 ID) 此作业将排队。

def checker(*args,**kwargs):
    pass

checker_job = update_queue.enqueue('checker',*args,depends_on=update_job.id,result_ttl=current_app.config['RESULT_TTL'])

然后检查“checker”内部依赖的状态,并根据该状态加载备份或删除它。

def checker(*args,**kwargs):
    job = rq.get_current_job()

    update_job = job.dependency
    
    if update_job.status == 'failed':
        # do the stuff here
    else:  # or elif
        # do the stuff here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?