每当特定运算符前面的任何运算符失败时,如何立即执行运算符?

如何解决每当特定运算符前面的任何运算符失败时,如何立即执行运算符?

我对 apache 气流有点陌生,想做一些 bashoperators。

理想情况下,dag 像 t1 >> t2 >> ... >> t9 >> t10 一样运行,对吗?

但我想要的是,如果 t9 之前的事件失败,它将立即执行 t9 和 t10。

这是我尝试过的:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator

from airflow.utils.trigger_rule import TriggerRule

args = {
    'owner': 'airflow',}

with DAG(
    dag_id='vp_bash_dag',default_args=args,schedule_interval='None',start_date=days_ago(2),dagrun_timeout=timedelta(minutes=60),tags=['vp','example'],params={"example_key": "example_value"},) as dag:

    t01 = BashOperator(
    task_id='t01',bash_command='[ -f /tmp/1 ] && exit 0 || exit 1',dag=dag)

    t02 = BashOperator(
    task_id='t02',bash_command='[ -f /tmp/2 ] && exit 0 || exit 1',dag=dag)

    t03 = BashOperator(
    task_id='t03',bash_command='[ -f /tmp/3 ] && exit 0 || exit 1',dag=dag)

    t04 = BashOperator(
    task_id='t04',bash_command='[ -f /tmp/4 ] && exit 0 || exit 1',dag=dag)

    t05 = BashOperator(
    task_id='t05',bash_command='[ -f /tmp/5 ] && exit 0 || exit 1',dag=dag)

    t06 = BashOperator(
    task_id='t06',bash_command='[ -f /tmp/6 ] && exit 0 || exit 1',dag=dag)

    t07 = BashOperator(
    task_id='t07',bash_command='[ -f /tmp/7 ] && exit 0 || exit 1',dag=dag)

    t08 = BashOperator(
    task_id='t08',bash_command='[ -f /tmp/8 ] && exit 0 || exit 1',dag=dag)

    t09 = BashOperator(
    task_id='t09',bash_command='[ -f /tmp/9 ] && exit 0 || exit 1',trigger_rule=TriggerRule.ONE_Failed,dag=dag)

    t10 = BashOperator(
    task_id='t10',bash_command='echo done',dag=dag)

t01 >> t02 >> t03 >> t04 >> t05 >> t06 >> t07 >> t08 >> t09 >> t10

有没有人可以用一些关键字或提示来指导我?

更新

结果是我想要的。

它可能看起来或听起来不像我说的。

但结果如我所料。

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from airflow.utils.trigger_rule import TriggerRule

args = {
    'owner': 'airflow',}

with DAG(
    dag_id='vp_Failed_branch',schedule_interval='0 0 * * *',bash_command='echo runed >> /tmp/airflow/01.runed',bash_command='echo runed >> /tmp/airflow/06.runed',bash_command='echo runed >> /tmp/airflow/07.runed',bash_command='echo runed >> /tmp/airflow/08.runed',bash_command='echo runed >> /tmp/airflow/09.runed',bash_command='echo runed >> /tmp/airflow/10.runed',dag=dag)

    t09_f = BashOperator(
    task_id='t09_f',bash_command='echo runed >> /tmp/airflow/09_f.runed',dag=dag)

    t10_f = BashOperator(
    task_id='t10_f',bash_command='echo runed >> /tmp/airflow/10_f.runed',dag=dag)


t01 >> t02 >> t03 >> t04 >> t05 >> t06 >> t07 >> t08 >> t09 >> t10
t01 >> t09_f
t02 >> t09_f
t03 >> t09_f
t04 >> t09_f
t05 >> t09_f
t06 >> t09_f
t07 >> t09_f
t08 >> t09_f
t09_f >> t10_f

解决方法

您要找的是chain

from airflow.utils.helpers import chain
chain([t01,t02,t03,t04,t05,t06,t07,t08],t09,t10)

注意 Airflow>=2.0 中的路径是:

from airflow.models.baseoperator import chain

依赖项将是:

enter image description here

由于 t09TriggerRule.ONE_FAILED,它会在 [t01,t08] 之一失败时立即运行。

,

您提供的代码最初对我有用(即,只需将 trigger_rulet09 添加为 TriggerRule.ONE_FAILED)。请参阅下面的屏幕截图链接。在 Airflow 2.0 上运行。

Task t01 fails and t09 executes followed by t10. DAG succeeded.

您在原始代码中看到了什么行为?

完整代码供参考(与 bash 命令略有不同):

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule

from datetime import timedelta

args = {
    "owner": "airflow",}

with DAG(
    dag_id="vp_failed_branch",default_args=args,schedule_interval=None,start_date=days_ago(2),dagrun_timeout=timedelta(minutes=60),tags=["vp","example"],params={"example_key": "example_value"},catchup=False,) as dag:

    t01 = BashOperator(task_id="t01",bash_command="exit 1")

    t02 = BashOperator(task_id="t02",bash_command="[ -f /tmp/2 ] && exit 0 || exit 1")

    t03 = BashOperator(task_id="t03",bash_command="[ -f /tmp/3 ] && exit 0 || exit 1")

    t04 = BashOperator(task_id="t04",bash_command="[ -f /tmp/4 ] && exit 0 || exit 1")

    t05 = BashOperator(task_id="t05",bash_command="[ -f /tmp/5 ] && exit 0 || exit 1")

    t06 = BashOperator(
        task_id="t06",bash_command="echo runed >> /tmp/airflow/06.runed"
    )

    t07 = BashOperator(
        task_id="t07",bash_command="echo runed >> /tmp/airflow/07.runed"
    )

    t08 = BashOperator(
        task_id="t08",bash_command="echo runed >> /tmp/airflow/08.runed"
    )

    t09 = BashOperator(
        task_id="t09",bash_command="echo foo",trigger_rule=TriggerRule.ONE_FAILED,)

    t10 = BashOperator(task_id="t10",bash_command="echo bar")

    t01 >> t02 >> t03 >> t04 >> t05 >> t06 >> t07 >> t08 >> t09 >> t10

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?