如何解决如何在 TriggerDagRunOperator
我正在尝试从另一个 DAG(来自 parent_dag 的 target_dag)调用 DAG。
我的 parent_dag 代码是:
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import Pythonoperator
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunoperator
def read_Metadata(**kwargs):
asqldb_kv = Variable.get("asql_kv")
perfom some operations based on asqldb_kv and populate the result_dictionary list
if len(result_dictionary) > 0:
my_var = Variable.set("num_runs",len(result_dictionary))
ti = kwargs['ti']
ti.xcom_push(key='res',value=result_dictionary)
default_args = {
'start_date': datetime(year=2021,month=6,day=19),'provide_context': True
}
with DAG(
dag_id='parent_dag',default_args=default_args,schedule_interval='@once',description='Test Trigger DAG'
) as dag:
trigger = TriggerDagRunoperator(
task_id="test_trigger_dagrun",python_callable=read_Metadata,trigger_dag_id="target_dag"
)
我收到以下错误:
airflow.exceptions.AirflowException: Invalid arguments were passed to TriggerDagRunoperator (task_id: test_trigger_dagrun). Invalid arguments were:
**kwargs: {'python_callable': <function read_Metadata at 0x7ff5f4159620>}
感谢任何帮助。
编辑:
python_callable 在 TriggerDagRunoperator - Airflow 2.0 中折旧。
我的要求是:
我需要访问 Azure Synapse 并获取一个变量(比如 3)。根据检索到的变量,我需要动态创建任务。比如说,如果 Synapse 有 3 个,那么我需要创建 3 个任务。
我的想法是:
DAG 1 - 访问 Azure 突触并获取变量。将此更新为气流变量。使用 TriggerDagRunoperator 触发 DAG2。
DAG 2 - 根据 DAG 1 中更新的气流变量创建任务。
任何输入我怎样才能做到这一点?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。