如何解决在任务成功/失败时重新安排 DAG
考虑一个非常简单的 Apache Airflow DAG:
FileSensor -> Pythonoperator
其中 FileSensor
正在等待一些文件出现(具有相对较短的 poke_interval
)并且 Pythonoperator
处理这些文件。此 DAG 已安排 @once
无限期运行 - 我如何将其设置为在成功(或失败)后重新安排为从 Pythonoperator
内再次运行?
解决方法
总的来说,我认为 Elad 的建议可能有效,但我认为这是一种不好的做法。 DAG 的设计(和名称)是非循环的,因此在其中创建任何类型的循环都可能导致其出现意外行为。
另外,根据 Airflow 文档,如果您计划使用外部 dag 触发器,您应该将 dag 计划设置为无。就我个人而言,我不确定它是否一定会破坏某些东西,但它绝对可以为您提供意想不到的输出。如果出现问题,您可能需要更长的时间来调试它。
恕我直言,更好的方法是让您尝试重新思考您的设计。如果您需要在失败时重新安排 dag,您可以利用传感器 https://www.astronomer.io/guides/what-is-a-sensor 的重新安排模式。不确定为什么要在成功时重新运行它,如果源中有多个文件,我想说的是在 dag 脚本中创建多个具有可变参数和 for 循环的传感器。
,正如 @Elad 建议的那样,TriggerDagRunOperator
是要走的路。结合 reset_dag_run
和 execution_date
参数,我能够解决这个问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。