如何解决当外部任务失败时,Airflow ExternalTaskSensor不会失败
我试图使用Airflow 1.10.11中的ExternalTaskSensor
来管理坐标,有些麻烦。我已经开发了这段代码来测试功能:
import time
from datetime import datetime,timedelta
from pprint import pprint
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunoperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import Pythonoperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.utils.state import State
sensors_dag = DAG(
"test_launch_sensors",schedule_interval=None,start_date=datetime(2020,2,14,0),dagrun_timeout=timedelta(minutes=150),tags=["DEMO"],)
dummy_dag = DAG(
"test_dummy_dag",)
def print_context(ds,**context):
pprint(context['conf'])
with dummy_dag:
starts = DummyOperator(task_id="starts",dag=dummy_dag)
empty = Pythonoperator(
task_id="empty",provide_context=True,python_callable=print_context,dag=dummy_dag,)
ends = DummyOperator(task_id="ends",dag=dummy_dag)
starts >> empty >> ends
with sensors_dag:
trigger = TriggerDagRunoperator(
task_id=f"trigger_{dummy_dag.dag_id}",trigger_dag_id=dummy_dag.dag_id,conf={"key": "value"},execution_date="{{ execution_date }}",)
sensor = ExternalTaskSensor(
task_id="wait_for_dag",external_dag_id=dummy_dag.dag_id,external_task_id="ends",Failed_states=["Failed","upstream_Failed"],poke_interval=5,timeout=120,)
trigger >> sensor
这个想法是,一个dag用TriggerDagRunoperator
触发另一个。这会将execution_date
设置为两个dag中相同的值。当dummy_dag
上一个任务ends
的状态为success
时,此方法非常有效。
但是,如果我这样强迫中间任务失败:
def print_context(ds,**context):
pprint(context['conf'])
raise Exception('ouch')
传感器未检测到Failed
或upstream_Failed
状态,并且一直运行直到超时。我使用了Failed_states
参数来指示哪些状态需要考虑为失败,但似乎不起作用。
我做错什么了吗?
解决方法
不幸的是,ExternalTaskSensor
操作仅查找DAG运行状态;一旦监视的DAG运行达到允许状态之一,传感器就会停止,并标记为成功,而不会引用监视的DAG运行状态。默认情况下,传感器仅查找SUCCESS
状态,因此,如果没有超时,则它会一直受监控的DAG运行失败而永远戳。
虽然您可以使用超时,但就像您需要传感器失败一样,如果外部DAG运行失败,则它是自己的DAG运行,就像未满足下一个任务的依赖关系一样。不幸的是,这需要您编写自己的传感器。
这是我的实现;它是ExternalTaskSensor()
类的简化版本,适合我的简单需求(无需检查特定的任务ID或执行日期相同的任何内容):
from airflow.exceptions import AirflowFailException
from airflow.models import DagRun
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
class ExternalDagrunSensor(BaseSensorOperator):
"""
Waits for a different DAG to complete; if the dagrun has failed,this
task fails itself as well.
:param external_dag_id: The dag_id that contains the task you want to
wait for
:type external_dag_id: str
"""
template_fields = ["external_dag_id"]
ui_color = "#19647e"
@apply_defaults
def __init__(self,external_dag_id,*args,**kwargs):
super().__init__(*args,**kwargs)
self.external_dag_id = external_dag_id
@provide_session
def poke(self,context,session=None):
dag_id,execution_date = self.external_dag_id,context["execution_date"]
self.log.info("Poking for %s.%s on %s ... ",dag_id,execution_date)
state = (
session.query(DagRun.state)
.filter(
DagRun.dag_id == dag_id,DagRun.execution_date == execution_date,DagRun.state.in_((State.SUCCESS,State.FAILED)),)
.scalar()
)
if state == State.FAILED:
raise AirflowFailException(
f"The external DAG run {dag_id} {execution_date} has failed,"
f"failing {self.task_id} task"
)
return state is not None
基本传感器实现将重复调用poke()
方法,直到返回True
(或达到可选的超时时间)为止,并通过提高AirflowFailException
将任务状态设置为失败,立即重试。如果要安排下游任务运行,则取决于下游任务配置。
ExternalTaskSensor只是戳入直到达到某些预期状态,该状态不打算与外部任务状态进行映射。
它默认为[State.SUCCESS],这就是如果成功的话您没有任何问题的原因。 添加allowed_states = [State.SUCCESS,State.failed,State.upstream_failed] 对于您的代码,至少可以确保外部任务已完成。
此外,如果soft_fail = False,则可以设置超时以使其失败
如果您希望传感器在外部任务失败时失败,则需要编写自己的传感器实现。
例如,这是我如何检查Dagrun的Last Dagrun以匹配特定状态
@provide_session
def poke(self,session=None):
"""
Checks if latest dag_run State is in expected state else keeps polling...
:param context:
:param session:
:return:
"""
DR = DagRun
self.log.info(
f"Poking for {self.external_dag_id},{self.allowed_states} -> {self.state_condition} ... "
)
# If state is expected to match
if self.state_condition:
query = session.query(DR).filter(DR.dag_id == self.external_dag_id,DR.state.notin_(self.allowed_states))
# If state is not expected to match
else:
query = session.query(DR).filter(DR.dag_id == self.external_dag_id,DR.state.in_(self.allowed_states))
# Filter by last_dagrun,could be max(execution_date) also but avoiding such aggregation
# by sorting dag_run chronologically in descendent order
query = query.order_by(DR.execution_date.desc()).first()
session.commit()
return not query
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。