自定义气流回调插件不起作用

如何解决自定义气流回调插件不起作用

我们的代码库中有许多相同的 Slack 回调函数粘贴了很多次,所以我希望将它们移动到一个单独的插件并从那里调用函数。但是,我在代码甚至无法访问插件方面遇到了问题。我在我的 try/except 块中遇到错误,这意味着我在 DAG 中做错了什么,但我运气不佳。

DAG 代码

from airflow import DAG
from airflow.operators.slack_airflow_plugins import SlackSuccessAlert
from airflow.operators.python_operator import Pythonoperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
import urllib


def slack_callback(context):
    operator = SlackSuccessAlert(
        dag_id = context.get("task_instance").dag_id,task_id = context.get("task_instance").task_id,execution_date = context.get("execution_date").isoformat()
        )

    return operator.execute(context=context)

default_args = {
    "depends_on_past": False,'start_date': datetime(2021,5,1)
}


dag = DAG(
    dag_id='slack_plugin_test',default_args=default_args,catchup=False,max_active_runs=1
)

success_task_test = DummyOperator(
    task_id='my_task',provide_context=True,on_success_callback=slack_callback,dag=dag)

success_task_test

插件代码

from airflow.utils.decorators import apply_defaults
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.models import BaSEOperator
from airflow.hooks.base_hook import BaseHook
import urllib
import os
from airflow.plugins_manager import AirflowPlugin


# Slack Alerts
# HOSTNAME variable set in docker-compose
base_url = "https://"+os.getenv('HOSTNAME')
slack_channel = 'airflow-alerts-test'
slack_conn_id = 'slack'
slack_token = str(BaseHook.get_connection("slack").password)


class SlackSuccessAlert(BaSEOperator):

    @apply_defaults
    def __init__(
            self,dag_id,task_id,execution_date,emoji = ':green_check:',message = 'has succeeded',*args,**kwargs):
        super().__init__(*args,**kwargs)
        self.dag_id = dag_id
        self.task_id = task_id
        self.execution_date = execution_date
        self.emoji = emoji
        self.message = message

    def execute(self,context):
        #dag_id = self.dag_id
        #task_id = self.task_id
        #execution_date = self.execution_date
        encoded_execution_date = urllib.parse.quote_plus(self.execution_date)
        dag_url = (f'{base_url}/graph?dag_id={self.dag_id}'
                   f'&execution_date={encoded_execution_date}')
        slack_message = (f'{self.emoji} *{self.dag_id}* <{dag_url}|*{self.message}*>.')
        slack_alert = SlackAPIPostOperator(
            task_id='slack_alert',channel=slack_channel,token=slack_token,text=slack_message
        )
        try:
            return slack_alert.execute(context=context)
        except:
            print("something going wrong with the callback...")



class SlackAlertsTest(AirflowPlugin):
    """
    Custom Slack Plugins for Airflow
    """
    name = "slack_airflow_plugins"
    operators = [SlackSuccessAlert]

我可能在错误地传递上下文时遇到问题,但我不确定我在这里做错了什么。除了我添加airflow 错误之外,我在 try/except 中没有收到任何错误。所以看起来我的代码甚至没有达到能够正确访问 plugin 的程度。

我正在尝试通过 dag_idexecution_date,以便 Slack 警报构建正确的 URL 并链接到刚刚成功/失败的 DAG。

我收到的错误是:

[2021-05-03 22:25:33,689] {taskinstance.py:1052} ERROR - Failed when executing success callback
[2021-05-03 22:25:33,689] {taskinstance.py:1053} ERROR - Argument ['task_id'] is required
Traceback (most recent call last):
  File "/opt/python/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1050,in _run_raw_task
    task.on_success_callback(context)
  File "/opt/airflow/dags/slack_plugin_test.py",line 11,in slack_callback
    operator = SlackSuccessAlert(
  File "/opt/python/lib/python3.8/site-packages/airflow/utils/decorators.py",line 98,in wrapper
    result = func(*args,**kwargs)
  File "/opt/airflow/plugins/slack_airflow_plugins.py",line 30,in __init__
    super().__init__(*args,**kwargs)
  File "/opt/python/lib/python3.8/site-packages/airflow/utils/decorators.py",line 94,in wrapper
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['task_id'] is required

解决方法

经过反复敲打我的脑袋,我想出了一个解决方案。而不是使用类创建插件,我只是将我的松弛回调移动到一个单独的 python 文件,该文件只包含函数,并将其作为模块导入并调用函数。 在此示例中,我导入的“插件”将被称为 basicplugintest

所以,这是一个示例 DAG,它允许我测试“触发”、“成功”和“失败”回调。您会注意到 'trigger' 和 'success' 都利用了 on_success_callback 气流功能,并且使用 partial 库,您可以在插件中自定义/创建 case 语句以将代码缩短一些线

最困难的部分是弄清楚如何使用 context 并能够使用 partial 模块来自定义某些回调

DAG

from airflow import DAG
import basicplugintest #this is the 'plugin' I created
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime,timedelta
import urllib
from functools import partial

# callback_type can be specified and used with if/elif statements
# probably best to only use them for trigger/success callbacks since
# they are both considered 'on_success' callbacks
# callback_type must be ordered after context 
def trigger_success_callback(context,callback_type):
    basicplugintest.success_callback(context,callback_type)

def retry_callback(context):
    basicplugintest.retry_callback(context)

def failure_callback(context):
    basicplugintest.failure_callback(context)


default_args = {
    "depends_on_past": False,'start_date': datetime(2021,5,1)
}


dag = DAG(
    dag_id='slack_plugin_test_functions',default_args=default_args,catchup=False,max_active_runs=1
)

start_task = DummyOperator(
    task_id='start_task',provide_context=True,# specify the callback_type to correspond to case statement in the separate plugin
    on_success_callback=partial(trigger_success_callback,callback_type='started'),dag=dag)

success_task = DummyOperator(
    task_id='success_task',on_success_callback=partial(trigger_success_callback,callback_type='success'),dag=dag)

fail_task = BashOperator(
    task_id='fail_task',bash_command='exit 123',retries=1,retry_delay=timedelta(seconds=10),on_retry_callback=retry_callback,on_failure_callback=failure_callback,dag=dag)

start_task >> success_task >> fail_task

这是插件:

# added in the dags folder instead of plugins
# functions are referenced by the dags themselves
from airflow.models import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
import urllib
from airflow.plugins_manager import AirflowPlugin
import os
import traceback

# Slack Alerts
# HOSTNAME variable set in docker-compose
base_url = os.getenv('AIRFLOW__WEBSERVER__BASE_URL')
slack_channel = 'airflow-alerts-test'
slack_token = str(BaseHook.get_connection("slack").password)


def success_callback(context,callback_type):
    if callback_type == 'success':
        emoji=':green_check:'
        message='has succeeded'
        dag_id=context.get("task_instance").dag_id
        task_id=context.get("task_instance").task_id
        execution_date = context.get("execution_date").isoformat()
        encoded_execution_date = urllib.parse.quote_plus(execution_date)
        dag_url = (f'{base_url}/graph?dag_id={dag_id}'
                   f'&execution_date={encoded_execution_date}')
        run_id = context.get("task_instance").xcom_pull(key='RUN_ID',task_ids='xcom_run_id')
        slack_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>.')
        slack_alert = SlackAPIPostOperator(
            task_id='slack_alert',channel=slack_channel,token=slack_token,text=slack_message
        )
        return slack_alert.execute(context=context)
    
    elif callback_type == 'started':
        emoji=':airflow:'
        message='has been triggered'
        dag_id=context.get("task_instance").dag_id
        task_id=context.get("task_instance").task_id
        execution_date = context.get("execution_date").isoformat()
        encoded_execution_date = urllib.parse.quote_plus(execution_date)
        dag_url = (f'{base_url}/graph?dag_id={dag_id}'
                   f'&execution_date={encoded_execution_date}')
        run_id = context.get("task_instance").xcom_pull(key='RUN_ID',text=slack_message
        )
        return slack_alert.execute(context=context)

def retry_callback(context):
    emoji=':alert:'
    message='has missed SLA'
    dag_id=context.get("task_instance").dag_id
    task_id=context.get("task_instance").task_id
    try_number=context.get("task_instance").try_number - 1 #bug in airflow sets try_number 1 higher than it should be
    max_tries=context.get("task_instance").max_tries + 1 #max_tries is equal to number of retries set,which does not include the first attempt
    execution_date = context.get("execution_date").isoformat()
    encoded_execution_date = urllib.parse.quote_plus(execution_date)
    dag_url = (f'{base_url}/graph?dag_id={dag_id}'
               f'&execution_date={encoded_execution_date}')
    run_id = context.get("task_instance").xcom_pull(key='RUN_ID',task_ids='xcom_run_id')

    exception = context.get("exception")
    formatted_exception = ''.join(
        traceback.format_exception(etype=type(exception),value=exception,tb=exception.__traceback__)
    ).strip()

    slack_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>! Try number {try_number} out of {max_tries}. <!channel|channel>'
                f'\n*Task:* {task_id}'
                f'\n*Error:* {formatted_exception}')
    slack_alert = SlackAPIPostOperator(
        task_id='slack_alert',text=slack_message
    )

    return slack_alert.execute(context=context)


def failure_callback(context):
    emoji = ':alert:'
    message = 'has failed'
    dag_id = context.get("task_instance").dag_id
    task_id = context.get("task_instance").task_id
    execution_date = context.get("execution_date").isoformat()
    encoded_execution_date = urllib.parse.quote_plus(execution_date)
    dag_url = (f'{base_url}/graph?dag_id={dag_id}'
               f'&execution_date={encoded_execution_date}')
    run_id = context.get("task_instance").xcom_pull(key='RUN_ID',tb=exception.__traceback__)
    ).strip()

    slack_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>! <!channel|channel>'
                f'\n*Task:* {task_id}'
                f'\n*Error:* {formatted_exception}')
    slack_alert = SlackAPIPostOperator(
        task_id='slack_alert',text=slack_message
    )
    return slack_alert.execute(context=context)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?