如何解决如何在 Airflow 中为长时间运行的任务正确设置警告功能?
我们想在 Airflow(Google 的 Data Composer)中创建一个类似于超时的警告功能,但有一些额外的条件:
- 如果“超时”发生,长时间运行的任务不应失败或跳过,它必须完成
- 我们希望在达到时间限制后发送电子邮件或记录超时事件而不中断我们长时间运行的任务
为此,我使用了一个 Pythonoperator,它试图确定长时间运行的任务是否仍在运行或完成或超时等。
问题是我无法从当前的 Dag Run 中获取有关长时间运行任务的开始时间的信息,而同时存在同一长时间运行任务的状态信息。
在下面的示例中,checker
任务尝试查询 status
和 start time
信息。
此处的开始时间始终为 None 值。分支操作员决定是否需要警告邮件,并相应地进行分支。它读取 XCOM,在那里它可以找到检查器任务的输出。合并任务加入分支并引导执行进入结束任务。
def checker(ti,**kwargs):
elapsed = None
timeout = 120
sleeptime = 10
dag_instance = kwargs['dag']
execution_date = kwargs['execution_date']
monitored_task = dag_instance.get_task("long_process")
while True:
print("Checker:",str(elapsed))
monitored_task_instance = TaskInstance(monitored_task,execution_date)
monitored_task_status = monitored_task_instance.current_state()[0]
print("task_status",monitored_task_status)
monitored_task_instance_start_time = datetime.strptime(monitored_task_instance.start_date,"%Y-%m-%d %H:%M:%s.%f")
elapsed = datetime.Now() - monitored_task_instance_start_time
if monitored_task_status == 'success' and elapsed < timedelta(timeout) :
ti.xcom_push(key="checker_result",value="finished")
print("checker: finished",str(elapsed))
return
if monitored_task_status == 'running' and elapsed >= timedelta(timeout):
ti.xcom_push(key="checker_result",value="timeout")
print("checker: timeout",str(elapsed))
return
time.sleep(sleeptime)
以下代码在 checker
任务中可能无法正常工作(未经测试,请将其视为伪代码):
monitored_task_instance_start_time = datetime.strptime(monitored_task_instance.start_date,"%Y-%m-%d %H:%M:%s.%f")
分支通过XCOM读取checker任务的输出并决定是否选择警告邮件分支。
def branch(**kwargs):
ti = kwargs['ti']
status = ti.xcom_pull(key="checker_result",task_ids=["checker"])[0]
if status == "timeout":
print("branch timeout")
return "mail_sender"
if status == "finished":
print("branch finished")
return "merge"
print("branch",status)
问题:
其余代码:
from datetime import datetime,timedelta,date
import time
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonoperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.python_operator import Pythonoperator
from airflow.models import TaskInstance
from airflow.utils.helpers import parse_template_string
yesterday = datetime.combine(datetime.today() - timedelta(1),datetime.min.time())
default_dag_args = {
'start_date': yesterday,'email_on_failure': False,'email_on_retry': False,'retries': 0,'retry_delay': timedelta(minutes=5),'catchup': False
}
def long_process():
print("long_process start:",str(datetime.Now()))
time.sleep(60)
print("long_process done:",str(datetime.Now()))
def mail_sender():
print("mail_sender: mail sent.")
with DAG(
dag_id='warning_mailer',default_args=default_dag_args,tags=['blablabla'],) as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end',trigger_rule=TriggerRule.ALL_SUCCESS)
task_long_process = Pythonoperator(
task_id="long_process",python_callable=long_process
)
task_checker = Pythonoperator(
task_id="checker",python_callable=checker,provide_context=True
)
task_mail_sender = Pythonoperator(
task_id="mail_sender",python_callable=mail_sender
)
task_branch = BranchPythonoperator(
task_id='branch',python_callable=branch,provide_context=True
)
task_merge = DummyOperator(
task_id='merge',trigger_rule=TriggerRule.ALL_SUCCESS
)
start >> [task_long_process,task_checker]
task_long_process >> end
task_checker >> task_branch >> [task_merge,task_mail_sender]
task_mail_sender >> task_merge >> end
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。