如何解决气流 on_failure_callback
你好,希望你们一切都好 我想问一个问题 最近我一直在尝试 airlfow 并在这里玩它是情况一切正常我有两个任务
- read_csv
- process_file
它们工作正常我故意在pandas Datframe中创建了一个错字,以了解失败回调是如何工作的,并查看是否正在被触发,从日志来看它似乎不是
''' 回溯(最近一次调用最后一次): 文件“/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py”,第1197行,handle_failure task.on_failure_callback(上下文) 类型错误:on_failure_callback() 采用 0 个位置参数,但给出了 1 个 '''
这是代码
try:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import Pythonoperator
from datetime import datetime
import pandas as pd
# Setting up Triggers
from airflow.utils.trigger_rule import TriggerRule
# for Getting Variables from airlfow
from airflow.models import Variable
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
def read_csv(**context):
data = [{"name":"Soumil","title":"Full Stack Software Engineer"},{ "name":"Nitin",]
df = pd.DataFramee(data=data)
dag_config = Variable.get("VAR1")
print("VAR 1 is : {} ".format(dag_config))
context['ti'].xcom_push(key='mykey',value=df)
def process_file(**context):
instance = context.get("ti").xcom_pull(key='mykey')
print(instance.head(2))
return "Process complete "
def on_failure_callback(**context):
print("Fail works ! ")
with DAG(dag_id="invoices_dag",schedule_interval="@once",default_args={
"owner": "airflow","start_date": datetime(2020,11,1),"retries": 1,"retry_delay": timedelta(minutes=1),'on_failure_callback': on_failure_callback,},catchup=False) as dag:
read_csv = Pythonoperator(
task_id="read_csv",python_callable=read_csv,op_kwargs={'filename': "Soumil.csv"},provide_context=True
)
process_file = Pythonoperator(
task_id="process_file",python_callable=process_file,provide_context=True
)
read_csv >> process_file
# ====================================Notes====================================
# all_success -> triggers when all tasks arecomplete
# one_success -> trigger when one task is complete
# all_done -> Trigger when all Tasks are Done
# all_Failed -> Trigger when all task Failed
# one_Failed -> one task is Failed
# none_Failed -> No Task Failed
# ==============================================================================
# ============================== Executor====================================
# There are Three main types of executor
# -> Sequential Executor run single task in linear fashion wih no parllelism default Dev
# -> Local Exector run each task in seperate process
# -> Celery Executor Run each worker node within multi node architecture Most scalable
# ===========================================================================
解决方法
您需要为您的函数指定一个参数,该参数可以接收上下文,这是由于 Airflow 如何触发 on_failure_callback
def on_failure_callback(context):
print("Fail works ! ")
请注意,使用您的实现,您无法从消息中判断哪个任务失败,因此您可能希望将任务详细信息添加到错误消息中,例如:
def on_failure_callback(context):
ti = context['task_instance']
print(f"task {ti.task_id } failed in dag { ti.dag_id } ")
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。