如何解决如何在气流DAG中设置数字作为重试条件?
在我的Airflow DAG
中,我有4个tasks
task_1 >> [task_2,task_3]>> task_4
task_4
仅在成功运行task_2
和task_3
如何设置条件:
如果task_2
失败,请在2分钟后重试task_2
,并在第五次尝试后停止重试
这是我的代码:
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python_operator import Pythonoperator
args={
'owner' : 'Anti','start_date':days_ago(1)# 1 means yesterday
}
dag = DAG(dag_id='my_sample_dag',default_args=args,schedule_interval='15 * * * *')
def func1(**context):
print("ran task 1")
def func2(**context):
print("ran task 2")
def func3(**context):
print("ran task 3")
def func4(**context):
print("ran task 4")
with dag:
task_1=Pythonoperator(
task_id='task1',python_callable=func1,provide_context=True,)
task_2=Pythonoperator(
task_id='task2',python_callable=func2,provide_context=True
)
task_3=Pythonoperator(
task_id='task3',python_callable=func3,provide_context=True
)
task_4=Pythonoperator(
task_id='task4',python_callable=func4,provide_context=True
)
task_1 >> [task_2,task_3]>> task_4 # t2,t3 runs parallel right after t1 has ran
解决方法
每个运算符都支持retry_delay
和retries
-Airflow documention。
重试(int)–在执行之前应重试的次数 任务失败
retry_delay(datetime.timedelta)–重试之间的延迟
如果要将其应用于所有任务,则只需编辑args字典:
args={
'owner' : 'Anti','retries': 5,'retry_delay': timedelta(minutes=2),'start_date':days_ago(1)# 1 means yesterday
}
如果您只想将其应用于task_2,则可以将其直接传递给PythonOperator
-在这种情况下,其他任务将使用默认设置。
对您的参数发表评论,不建议设置动态相对值start_date
,而是设置固定的绝对日期。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。