如何在 Airflow 中为长时间运行的任务正确设置警告功能?

如何解决如何在 Airflow 中为长时间运行的任务正确设置警告功能?

我们想在 Airflow(Google 的 Data Composer)中创建一个类似于超时的警告功能,但有一些额外的条件:

  • 如果“超时”发生,长时间运行的任务不应失败或跳过,它必须完成
  • 我们希望在达到时间限制后发送电子邮件或记录超时事件而不中断我们长时间运行的任务

为此,我使用了一个 Pythonoperator,它试图确定长时间运行的任务是否仍在运行或完成或超时等。

问题是我无法从当前的 Dag Run 中获取有关长时间运行任务的开始时间的信息,而同时存在同一长时间运行任务的状态信息。

在下面的示例中,checker 任务尝试查询 statusstart time 信息。 此处的开始时间始终为 None 值。分支操作员决定是否需要警告邮件,并相应地进行分支。它读取 XCOM,在那里它可以找到检查器任务的输出。合并任务加入分支并引导执行进入结束任务。

def checker(ti,**kwargs):

    elapsed = None
    timeout = 120
    sleeptime = 10
    dag_instance = kwargs['dag']
    execution_date = kwargs['execution_date']
    monitored_task = dag_instance.get_task("long_process")

    while True:

        print("Checker:",str(elapsed))
        monitored_task_instance = TaskInstance(monitored_task,execution_date)

        monitored_task_status = monitored_task_instance.current_state()[0]
        print("task_status",monitored_task_status)

        monitored_task_instance_start_time = datetime.strptime(monitored_task_instance.start_date,"%Y-%m-%d %H:%M:%s.%f")
        elapsed = datetime.Now() - monitored_task_instance_start_time

        if monitored_task_status == 'success' and elapsed < timedelta(timeout) :
            ti.xcom_push(key="checker_result",value="finished")
            print("checker: finished",str(elapsed))
            return
        
        if monitored_task_status == 'running' and elapsed >= timedelta(timeout):
            ti.xcom_push(key="checker_result",value="timeout")
            print("checker: timeout",str(elapsed))
            return

        time.sleep(sleeptime)

以下代码checker 任务中可能无法正常工作(未经测试,请将其视为伪代码):

monitored_task_instance_start_time = datetime.strptime(monitored_task_instance.start_date,"%Y-%m-%d %H:%M:%s.%f")

分支通过XCOM读取checker任务的输出并决定是否选择警告邮件分支。

def branch(**kwargs):
    ti = kwargs['ti']
    status = ti.xcom_pull(key="checker_result",task_ids=["checker"])[0]
    if status == "timeout":
        print("branch timeout")
        return "mail_sender"
    if status == "finished":
        print("branch finished")
        return "merge"
    print("branch",status)

问题:

  • 如何正确执行此警告功能
  • 如何正确获取当前 Dag Run 中某个任务的开始时间?

其余代码

from datetime import datetime,timedelta,date
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonoperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.python_operator import Pythonoperator
from airflow.models import TaskInstance
from airflow.utils.helpers import parse_template_string


yesterday = datetime.combine(datetime.today() - timedelta(1),datetime.min.time())
default_dag_args = {
    'start_date': yesterday,'email_on_failure': False,'email_on_retry': False,'retries': 0,'retry_delay': timedelta(minutes=5),'catchup': False
}

def long_process():
    print("long_process start:",str(datetime.Now()))
    time.sleep(60)
    print("long_process done:",str(datetime.Now()))

def mail_sender():
    print("mail_sender: mail sent.")

with DAG(
        dag_id='warning_mailer',default_args=default_dag_args,tags=['blablabla'],) as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end',trigger_rule=TriggerRule.ALL_SUCCESS)
    task_long_process = Pythonoperator(
        task_id="long_process",python_callable=long_process
    )
    task_checker = Pythonoperator(
        task_id="checker",python_callable=checker,provide_context=True
    )
    task_mail_sender = Pythonoperator(
        task_id="mail_sender",python_callable=mail_sender
    )
    task_branch = BranchPythonoperator(
        task_id='branch',python_callable=branch,provide_context=True
    )
    task_merge = DummyOperator(
        task_id='merge',trigger_rule=TriggerRule.ALL_SUCCESS
    )

    start >> [task_long_process,task_checker]
    task_long_process >> end
    task_checker >> task_branch >> [task_merge,task_mail_sender]
    task_mail_sender >> task_merge >> end

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?