当外部任务失败时,Airflow ExternalTask​​Sensor不会失败

如何解决当外部任务失败时,Airflow ExternalTask​​Sensor不会失败

我试图使用Airflow 1.10.11中的ExternalTaskSensor来管理坐标,有些麻烦。我已经开发了这段代码来测试功能

import time
from datetime import datetime,timedelta
from pprint import pprint

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunoperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import Pythonoperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State

sensors_dag = DAG(
    "test_launch_sensors",schedule_interval=None,start_date=datetime(2020,2,14,0),dagrun_timeout=timedelta(minutes=150),tags=["DEMO"],)

dummy_dag = DAG(
    "test_dummy_dag",)


def print_context(ds,**context):
    pprint(context['conf'])


with dummy_dag:
    starts = DummyOperator(task_id="starts",dag=dummy_dag)
    empty = Pythonoperator(
        task_id="empty",provide_context=True,python_callable=print_context,dag=dummy_dag,)
    ends = DummyOperator(task_id="ends",dag=dummy_dag)

    starts >> empty >> ends

with sensors_dag:
    trigger = TriggerDagRunoperator(
        task_id=f"trigger_{dummy_dag.dag_id}",trigger_dag_id=dummy_dag.dag_id,conf={"key": "value"},execution_date="{{ execution_date }}",)
    sensor = ExternalTaskSensor(
        task_id="wait_for_dag",external_dag_id=dummy_dag.dag_id,external_task_id="ends",Failed_states=["Failed","upstream_Failed"],poke_interval=5,timeout=120,)
    trigger >> sensor

这个想法是,一个dag用TriggerDagRunoperator触发另一个。这会将execution_date设置为两个dag中相同的值。当dummy_dag一个任务ends的状态为success时,此方法非常有效。

但是,如果我这样强迫中间任务失败:

def print_context(ds,**context):
    pprint(context['conf'])
    raise Exception('ouch')

传感器未检测到Failedupstream_Failed状态,并且一直运行直到超时。我使用了Failed_states参数来指示哪些状态需要考虑为失败,但似乎不起作用。

我做错什么了吗?

解决方法

不幸的是,ExternalTaskSensor操作仅查找DAG运行状态;一旦监视的DAG运行达到允许状态之一,传感器就会停止,并标记为成功,而不会引用监视的DAG运行状态。默认情况下,传感器仅查找SUCCESS状态,因此,如果没有超时,则它会一直受监控的DAG运行失败而永远戳。

虽然您可以使用超时,但就像您需要传感器失败一样,如果外部DAG运行失败,则它是自己的DAG运行,就像未满足下一个任务的依赖关系一样。不幸的是,这需要您编写自己的传感器。

这是我的实现;它是ExternalTaskSensor()类的简化版本,适合我的简单需求(无需检查特定的任务ID或执行日期相同的任何内容):

from airflow.exceptions import AirflowFailException
from airflow.models import DagRun
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State

class ExternalDagrunSensor(BaseSensorOperator):
    """
    Waits for a different DAG to complete; if the dagrun has failed,this
    task fails itself as well.

    :param external_dag_id: The dag_id that contains the task you want to
        wait for
    :type external_dag_id: str
    """

    template_fields = ["external_dag_id"]
    ui_color = "#19647e"

    @apply_defaults
    def __init__(self,external_dag_id,*args,**kwargs):
        super().__init__(*args,**kwargs)
        self.external_dag_id = external_dag_id

    @provide_session
    def poke(self,context,session=None):
        dag_id,execution_date = self.external_dag_id,context["execution_date"]
        self.log.info("Poking for %s.%s on %s ... ",dag_id,execution_date)

        state = (
            session.query(DagRun.state)
            .filter(
                DagRun.dag_id == dag_id,DagRun.execution_date == execution_date,DagRun.state.in_((State.SUCCESS,State.FAILED)),)
            .scalar()
        )
        if state == State.FAILED:
            raise AirflowFailException(
                f"The external DAG run {dag_id} {execution_date} has failed,"
                f"failing {self.task_id} task"
            )
        return state is not None

基本传感器实现将重复调用poke()方法,直到返回True(或达到可选的超时时间)为止,并通过提高AirflowFailException将任务状态设置为失败,立即重试。如果要安排下游任务运行,则取决于下游任务配置。

,

ExternalTask​​Sensor只是戳入直到达到某些预期状态,该状态不打算与外部任务状态进行映射。

它默认为[State.SUCCESS],这就是如果成功的话您没有任何问题的原因。 添加allowed_states = [State.SUCCESS,State.failed,State.upstream_failed] 对于您的代码,至少可以确保外部任务已完成。

此外,如果soft_fail = False,则可以设置超时以使其失败

如果您希望传感器在外部任务失败时失败,则需要编写自己的传感器实现。

例如,这是我如何检查Dagrun的Last Dagrun以匹配特定状态

@provide_session
def poke(self,session=None):
    """
    Checks if latest dag_run State is in expected state else keeps polling...
    :param context:
    :param session:
    :return:
    """
    DR = DagRun
    self.log.info(
        f"Poking for {self.external_dag_id},{self.allowed_states} -> {self.state_condition} ... "
    )
    # If state is expected to match
    if self.state_condition:
        query = session.query(DR).filter(DR.dag_id == self.external_dag_id,DR.state.notin_(self.allowed_states))
    # If state is not expected to match
    else:
        query = session.query(DR).filter(DR.dag_id == self.external_dag_id,DR.state.in_(self.allowed_states))
    # Filter by last_dagrun,could be max(execution_date) also but avoiding such aggregation
    # by sorting dag_run chronologically in descendent order
    query = query.order_by(DR.execution_date.desc()).first()
    session.commit()
    return not query

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?