如何解决每当特定运算符前面的任何运算符失败时,如何立即执行运算符?
我对 apache 气流有点陌生,想做一些 bashoperators。
理想情况下,dag 像 t1 >> t2 >> ... >> t9 >> t10 一样运行,对吗?
但我想要的是,如果 t9 之前的事件失败,它将立即执行 t9 和 t10。
这是我尝试过的:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
args = {
'owner': 'airflow',}
with DAG(
dag_id='vp_bash_dag',default_args=args,schedule_interval='None',start_date=days_ago(2),dagrun_timeout=timedelta(minutes=60),tags=['vp','example'],params={"example_key": "example_value"},) as dag:
t01 = BashOperator(
task_id='t01',bash_command='[ -f /tmp/1 ] && exit 0 || exit 1',dag=dag)
t02 = BashOperator(
task_id='t02',bash_command='[ -f /tmp/2 ] && exit 0 || exit 1',dag=dag)
t03 = BashOperator(
task_id='t03',bash_command='[ -f /tmp/3 ] && exit 0 || exit 1',dag=dag)
t04 = BashOperator(
task_id='t04',bash_command='[ -f /tmp/4 ] && exit 0 || exit 1',dag=dag)
t05 = BashOperator(
task_id='t05',bash_command='[ -f /tmp/5 ] && exit 0 || exit 1',dag=dag)
t06 = BashOperator(
task_id='t06',bash_command='[ -f /tmp/6 ] && exit 0 || exit 1',dag=dag)
t07 = BashOperator(
task_id='t07',bash_command='[ -f /tmp/7 ] && exit 0 || exit 1',dag=dag)
t08 = BashOperator(
task_id='t08',bash_command='[ -f /tmp/8 ] && exit 0 || exit 1',dag=dag)
t09 = BashOperator(
task_id='t09',bash_command='[ -f /tmp/9 ] && exit 0 || exit 1',trigger_rule=TriggerRule.ONE_Failed,dag=dag)
t10 = BashOperator(
task_id='t10',bash_command='echo done',dag=dag)
t01 >> t02 >> t03 >> t04 >> t05 >> t06 >> t07 >> t08 >> t09 >> t10
有没有人可以用一些关键字或提示来指导我?
更新
结果是我想要的。
它可能看起来或听起来不像我说的。
但结果如我所料。
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.utils.trigger_rule import TriggerRule
args = {
'owner': 'airflow',}
with DAG(
dag_id='vp_Failed_branch',schedule_interval='0 0 * * *',bash_command='echo runed >> /tmp/airflow/01.runed',bash_command='echo runed >> /tmp/airflow/06.runed',bash_command='echo runed >> /tmp/airflow/07.runed',bash_command='echo runed >> /tmp/airflow/08.runed',bash_command='echo runed >> /tmp/airflow/09.runed',bash_command='echo runed >> /tmp/airflow/10.runed',dag=dag)
t09_f = BashOperator(
task_id='t09_f',bash_command='echo runed >> /tmp/airflow/09_f.runed',dag=dag)
t10_f = BashOperator(
task_id='t10_f',bash_command='echo runed >> /tmp/airflow/10_f.runed',dag=dag)
t01 >> t02 >> t03 >> t04 >> t05 >> t06 >> t07 >> t08 >> t09 >> t10
t01 >> t09_f
t02 >> t09_f
t03 >> t09_f
t04 >> t09_f
t05 >> t09_f
t06 >> t09_f
t07 >> t09_f
t08 >> t09_f
t09_f >> t10_f
解决方法
您要找的是chain
:
from airflow.utils.helpers import chain
chain([t01,t02,t03,t04,t05,t06,t07,t08],t09,t10)
注意 Airflow>=2.0 中的路径是:
from airflow.models.baseoperator import chain
依赖项将是:
由于 t09
有 TriggerRule.ONE_FAILED
,它会在 [t01,t08]
之一失败时立即运行。
您提供的代码最初对我有用(即,只需将 trigger_rule
的 t09
添加为 TriggerRule.ONE_FAILED
)。请参阅下面的屏幕截图链接。在 Airflow 2.0 上运行。
Task t01
fails and t09
executes followed by t10
. DAG succeeded.
您在原始代码中看到了什么行为?
完整代码供参考(与 bash 命令略有不同):
from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule
from datetime import timedelta
args = {
"owner": "airflow",}
with DAG(
dag_id="vp_failed_branch",default_args=args,schedule_interval=None,start_date=days_ago(2),dagrun_timeout=timedelta(minutes=60),tags=["vp","example"],params={"example_key": "example_value"},catchup=False,) as dag:
t01 = BashOperator(task_id="t01",bash_command="exit 1")
t02 = BashOperator(task_id="t02",bash_command="[ -f /tmp/2 ] && exit 0 || exit 1")
t03 = BashOperator(task_id="t03",bash_command="[ -f /tmp/3 ] && exit 0 || exit 1")
t04 = BashOperator(task_id="t04",bash_command="[ -f /tmp/4 ] && exit 0 || exit 1")
t05 = BashOperator(task_id="t05",bash_command="[ -f /tmp/5 ] && exit 0 || exit 1")
t06 = BashOperator(
task_id="t06",bash_command="echo runed >> /tmp/airflow/06.runed"
)
t07 = BashOperator(
task_id="t07",bash_command="echo runed >> /tmp/airflow/07.runed"
)
t08 = BashOperator(
task_id="t08",bash_command="echo runed >> /tmp/airflow/08.runed"
)
t09 = BashOperator(
task_id="t09",bash_command="echo foo",trigger_rule=TriggerRule.ONE_FAILED,)
t10 = BashOperator(task_id="t10",bash_command="echo bar")
t01 >> t02 >> t03 >> t04 >> t05 >> t06 >> t07 >> t08 >> t09 >> t10
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。