如何解决自定义气流回调插件不起作用
我们的代码库中有许多相同的 Slack 回调函数粘贴了很多次,所以我希望将它们移动到一个单独的插件并从那里调用函数。但是,我在代码甚至无法访问插件方面遇到了问题。我在我的 try/except
块中遇到错误,这意味着我在 DAG
中做错了什么,但我运气不佳。
DAG
代码
from airflow import DAG
from airflow.operators.slack_airflow_plugins import SlackSuccessAlert
from airflow.operators.python_operator import Pythonoperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
import urllib
def slack_callback(context):
operator = SlackSuccessAlert(
dag_id = context.get("task_instance").dag_id,task_id = context.get("task_instance").task_id,execution_date = context.get("execution_date").isoformat()
)
return operator.execute(context=context)
default_args = {
"depends_on_past": False,'start_date': datetime(2021,5,1)
}
dag = DAG(
dag_id='slack_plugin_test',default_args=default_args,catchup=False,max_active_runs=1
)
success_task_test = DummyOperator(
task_id='my_task',provide_context=True,on_success_callback=slack_callback,dag=dag)
success_task_test
from airflow.utils.decorators import apply_defaults
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.models import BaSEOperator
from airflow.hooks.base_hook import BaseHook
import urllib
import os
from airflow.plugins_manager import AirflowPlugin
# Slack Alerts
# HOSTNAME variable set in docker-compose
base_url = "https://"+os.getenv('HOSTNAME')
slack_channel = 'airflow-alerts-test'
slack_conn_id = 'slack'
slack_token = str(BaseHook.get_connection("slack").password)
class SlackSuccessAlert(BaSEOperator):
@apply_defaults
def __init__(
self,dag_id,task_id,execution_date,emoji = ':green_check:',message = 'has succeeded',*args,**kwargs):
super().__init__(*args,**kwargs)
self.dag_id = dag_id
self.task_id = task_id
self.execution_date = execution_date
self.emoji = emoji
self.message = message
def execute(self,context):
#dag_id = self.dag_id
#task_id = self.task_id
#execution_date = self.execution_date
encoded_execution_date = urllib.parse.quote_plus(self.execution_date)
dag_url = (f'{base_url}/graph?dag_id={self.dag_id}'
f'&execution_date={encoded_execution_date}')
slack_message = (f'{self.emoji} *{self.dag_id}* <{dag_url}|*{self.message}*>.')
slack_alert = SlackAPIPostOperator(
task_id='slack_alert',channel=slack_channel,token=slack_token,text=slack_message
)
try:
return slack_alert.execute(context=context)
except:
print("something going wrong with the callback...")
class SlackAlertsTest(AirflowPlugin):
"""
Custom Slack Plugins for Airflow
"""
name = "slack_airflow_plugins"
operators = [SlackSuccessAlert]
我可能在错误地传递上下文时遇到问题,但我不确定我在这里做错了什么。除了我添加的 airflow
错误之外,我在 try/except
中没有收到任何错误。所以看起来我的代码甚至没有达到能够正确访问 plugin
的程度。
我正在尝试通过 dag_id
和 execution_date
,以便 Slack 警报构建正确的 URL 并链接到刚刚成功/失败的 DAG。
我收到的错误是:
[2021-05-03 22:25:33,689] {taskinstance.py:1052} ERROR - Failed when executing success callback
[2021-05-03 22:25:33,689] {taskinstance.py:1053} ERROR - Argument ['task_id'] is required
Traceback (most recent call last):
File "/opt/python/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1050,in _run_raw_task
task.on_success_callback(context)
File "/opt/airflow/dags/slack_plugin_test.py",line 11,in slack_callback
operator = SlackSuccessAlert(
File "/opt/python/lib/python3.8/site-packages/airflow/utils/decorators.py",line 98,in wrapper
result = func(*args,**kwargs)
File "/opt/airflow/plugins/slack_airflow_plugins.py",line 30,in __init__
super().__init__(*args,**kwargs)
File "/opt/python/lib/python3.8/site-packages/airflow/utils/decorators.py",line 94,in wrapper
raise AirflowException(msg)
airflow.exceptions.AirflowException: Argument ['task_id'] is required
解决方法
经过反复敲打我的脑袋,我想出了一个解决方案。而不是使用类创建插件,我只是将我的松弛回调移动到一个单独的 python 文件,该文件只包含函数,并将其作为模块导入并调用函数。
在此示例中,我导入的“插件”将被称为 basicplugintest
所以,这是一个示例 DAG,它允许我测试“触发”、“成功”和“失败”回调。您会注意到 'trigger' 和 'success' 都利用了 on_success_callback
气流功能,并且使用 partial
库,您可以在插件中自定义/创建 case 语句以将代码缩短一些线
最困难的部分是弄清楚如何使用 context
并能够使用 partial
模块来自定义某些回调
DAG
from airflow import DAG
import basicplugintest #this is the 'plugin' I created
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime,timedelta
import urllib
from functools import partial
# callback_type can be specified and used with if/elif statements
# probably best to only use them for trigger/success callbacks since
# they are both considered 'on_success' callbacks
# callback_type must be ordered after context
def trigger_success_callback(context,callback_type):
basicplugintest.success_callback(context,callback_type)
def retry_callback(context):
basicplugintest.retry_callback(context)
def failure_callback(context):
basicplugintest.failure_callback(context)
default_args = {
"depends_on_past": False,'start_date': datetime(2021,5,1)
}
dag = DAG(
dag_id='slack_plugin_test_functions',default_args=default_args,catchup=False,max_active_runs=1
)
start_task = DummyOperator(
task_id='start_task',provide_context=True,# specify the callback_type to correspond to case statement in the separate plugin
on_success_callback=partial(trigger_success_callback,callback_type='started'),dag=dag)
success_task = DummyOperator(
task_id='success_task',on_success_callback=partial(trigger_success_callback,callback_type='success'),dag=dag)
fail_task = BashOperator(
task_id='fail_task',bash_command='exit 123',retries=1,retry_delay=timedelta(seconds=10),on_retry_callback=retry_callback,on_failure_callback=failure_callback,dag=dag)
start_task >> success_task >> fail_task
这是插件:
# added in the dags folder instead of plugins
# functions are referenced by the dags themselves
from airflow.models import DAG
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
import urllib
from airflow.plugins_manager import AirflowPlugin
import os
import traceback
# Slack Alerts
# HOSTNAME variable set in docker-compose
base_url = os.getenv('AIRFLOW__WEBSERVER__BASE_URL')
slack_channel = 'airflow-alerts-test'
slack_token = str(BaseHook.get_connection("slack").password)
def success_callback(context,callback_type):
if callback_type == 'success':
emoji=':green_check:'
message='has succeeded'
dag_id=context.get("task_instance").dag_id
task_id=context.get("task_instance").task_id
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID',task_ids='xcom_run_id')
slack_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>.')
slack_alert = SlackAPIPostOperator(
task_id='slack_alert',channel=slack_channel,token=slack_token,text=slack_message
)
return slack_alert.execute(context=context)
elif callback_type == 'started':
emoji=':airflow:'
message='has been triggered'
dag_id=context.get("task_instance").dag_id
task_id=context.get("task_instance").task_id
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID',text=slack_message
)
return slack_alert.execute(context=context)
def retry_callback(context):
emoji=':alert:'
message='has missed SLA'
dag_id=context.get("task_instance").dag_id
task_id=context.get("task_instance").task_id
try_number=context.get("task_instance").try_number - 1 #bug in airflow sets try_number 1 higher than it should be
max_tries=context.get("task_instance").max_tries + 1 #max_tries is equal to number of retries set,which does not include the first attempt
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID',task_ids='xcom_run_id')
exception = context.get("exception")
formatted_exception = ''.join(
traceback.format_exception(etype=type(exception),value=exception,tb=exception.__traceback__)
).strip()
slack_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>! Try number {try_number} out of {max_tries}. <!channel|channel>'
f'\n*Task:* {task_id}'
f'\n*Error:* {formatted_exception}')
slack_alert = SlackAPIPostOperator(
task_id='slack_alert',text=slack_message
)
return slack_alert.execute(context=context)
def failure_callback(context):
emoji = ':alert:'
message = 'has failed'
dag_id = context.get("task_instance").dag_id
task_id = context.get("task_instance").task_id
execution_date = context.get("execution_date").isoformat()
encoded_execution_date = urllib.parse.quote_plus(execution_date)
dag_url = (f'{base_url}/graph?dag_id={dag_id}'
f'&execution_date={encoded_execution_date}')
run_id = context.get("task_instance").xcom_pull(key='RUN_ID',tb=exception.__traceback__)
).strip()
slack_message = (f'{emoji} *{dag_id}* {message} for dp_run_id <{dag_url}|*{run_id}*>! <!channel|channel>'
f'\n*Task:* {task_id}'
f'\n*Error:* {formatted_exception}')
slack_alert = SlackAPIPostOperator(
task_id='slack_alert',text=slack_message
)
return slack_alert.execute(context=context)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。