如何使用 ExternalTask​​Sensor 触发 Airflow DAG 独立运行

如何解决如何使用 ExternalTask​​Sensor 触发 Airflow DAG 独立运行

我构建了两个 DAG(dag_a,dag_b) 并在 dag_b 中创建了一个外部任务传感器,它会戳破 dag_a。这些 DAG 有两个用例:

  1. 同时调度dag_a和dag_b,使用依赖先处理dag_a再处理dag_b
  2. 独立手动触发 dag_b,无需关心 dag_a。

使用 ExternalTask​​Sensor,用例 1 工作得很好。但是用例 2 不起作用。并且 dag_b 将在 ExternalTask​​Sensor 处为 dag_a 停止并永远戳。有没有办法在某些条件下跳过 ExternalTask​​Sensor 并独立运行 dag_b?

解决方法

Airflow 2.0.0 开始,我认为没有必要使用 ExternalTaskSensor。使用 TriggerDagRunOperator,您可以创建和调度充当控制器的 DAG,有两个任务负责触发 DAG_A 和 DAG_B。

使用 wait_for_completion 参数,您可以实现第一个用例,而不会影响独立触发 DAG_B 的可能性。您还可以指定期望 poke_interval允许或失败状态。

Docs:

  • wait_for_completion (bool) -- 是否等待 dag 运行完成。 (默认:False)
  • poke_interval (int) -- 当 wait_for_completion=True 时戳间隔以检查 dag 运行状态。 (默认:60)
  • allowed_states (list) -- 允许的状态列表,默认为 ['success']
  • failed_states (list) -- 失败或不允许的状态列表,默认为无

这是指向 example DAG 随 Airflow 提供的链接。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?