如何解决气流自定义 schedule_interval 设置
有没有办法在 Airflow DAG 中设置/编写自定义 schedule_interval? 我正在寻找的是一种在 DAG 除假期(如圣诞节、劳动节、独立日等)之外每天运行时设置时间表的方法
使用标准的 cron 表达式是不可能实现的。非常感谢任何帮助/指南。
解决方法
使用 PythonBranchOperator
或创建一个继承 BaseBranchOperator
的新运算符,您可以在其中实现跳过逻辑。我相信你需要一个 DummyOperator
作为“跳过”分支,你的常规 DAG 流作为另一个分支。对于您的 cron 表达式,请使用任何正常计划,并在处理分支的任务中实现自定义跳过。
没有对这种类型的调度的原生支持,但您可以通过在工作流程的开头添加 ShortCircuitOperator
来解决此问题。
这个操作符执行一个可调用的python。如果条件满足,则继续工作流,如果条件不满足,则将所有下游任务标记为已跳过。
可能的解决方案是:
import holidays
def decide(**kwargs):
# Select country
us_holidays = holidays.US()
if str(kwargs['execution_date']) in us_holidays:
return False # Skip workflow if it's a holiday.
return True
dag = DAG(
dag_id='mydag',schedule_interval='@daily',default_args=default_args,)
start_op = ShortCircuitOperator(
task_id='start_task',python_callable=decide,provide_context=True,# Remove this line if you are using Airflow>=2.0.0
dag=dag
)
#Replace this with your actual Operator in your workflow.
next_op = Operator(
task_id='next_task',dag=dag
)
start_op >> next_op
此解决方案基于 Detecting a US Holiday 中提供的答案,我没有对其进行测试,但它应该可以工作。在任何情况下,您都可以将 decide
中的逻辑替换为任何检测日期是否为假日的方法。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。