如何解决在 Airflow 2.0 中使用 Taskflow API 传递争论
我正在使用 REST API 将参数传递给基于任务流的 Dag。看看这个论坛上提出的类似问题,下面似乎是访问传递参数的常用方法。
#From inside a template field or file:
{{ dag_run.conf['key'] }}
#Or when context is available,e.g. within a python callable of the Pythonoperator:
context['dag_run'].conf['key']
@dag(default_args=default_args,schedule_interval=None,start_date=days_ago(2),params=None)
def classic_xgb(**context):
"""
### TaskFlow API Tutorial Documentation
[here](https://airflow.apache.org/docs/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
print("context is ",context)
输出是
任何帮助将不胜感激。
谢谢
此致,
阿迪尔
解决方法
有一个新函数 get_current_context()
可以在 Airflow 2.0 中获取上下文。获得上下文字典后,“params”键包含通过 REST API 发送到 Dag 的参数。下面的代码解决了这个问题。
from airflow.operators.python import task,get_current_context
default_args = {
'owner': 'airflow',}
@dag(default_args=default_args,schedule_interval=None,start_date=days_ago(2))
def classic_xgb(**kwargs):
"""
@task()
def extract():
context = get_current_context()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。