如何解决评估 Airflow 中的模板后如何返回对象?
我们正在设计一个变量选择和参数设置器逻辑,当 DAG 被触发时需要对其进行评估。我们的 DAG 在执行之前生成。我们决定将静态代码修改为自定义宏。
直到此时,在运算符定义之间定义了一个代码,因此当 DAG 生成器代码生成 DAG 时,它正在运行。此代码无法处理用于选择正确 Airflow 变量的运行时参数。
for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
dag_id = "TableLoader_" + str(table_name)
default_dag_args={...}
schedule = None
globals()[dag_id] = create_dag(dag_id,schedule,default_dag_args)
def create_dag(dag_id,default_dag_args):
with DAG(
default_args=default_dag_args,dag_id=dag_id,schedule_interval=schedule,user_defined_macros={ "load_type_handler": load_type_handler }
) as dag:
# static python code which sets pipeline_rt_args for all generated DAGs the same way
# this static code Could set only one type (INITIAL or INCREMENTAL)
# but we want to decide during the execution Now
# Operator DeFinitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),# ---> Can't handle runtime parameters <---
runtime_args=pipeline_rt_args,# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
现在我们想在从 UI 或 REST API 触发 DAG 时传递 load_type
(例如:INITIAL
,INCREMENTAL
),因此我们需要修改这个旧的(静态) 行为(仅处理一种情况,但不能同时处理两种情况)以获得正确的气流变量并为我们的 CloudDataFusionStartPipelineOperator
创建正确的对象:
例如:
{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}
但是如果我们做这样的事情:
def create_dag(dag_id,default_dag_args):
def extend_runtime_args(prefix,param,field_name,date,job_id):
# reading the Trigger-time parameter
load_type = param.conf["load_type"]
# getting the proper Airflow Variable (depending on current load type)
result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]
# setting 'job_id','dateValue','date','GCS_Input_Path' for CloudDataFusionStartPipelineOperator
# ...
return rt_args
with DAG( #...
user_defined_macros={
"extend_runtime_args": extend_runtime_args
}) as dag:
# removed static code (which executes only in generation time)
# Operator DeFinitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),# ---> handles runtime arguments with custom macro <---
runtime_args="""{{ extend_runtime_args('PREFIX',dag_run,'runtime_args',macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"),ti.job_id) }}""",# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
注意:
这里我们需要的是自定义逻辑的“未来”评估(不在 DAG 生成时评估),它将返回一个对象,这就是我们在这里使用模板的原因。
我们遇到以下情况:
- 在自定义宏函数
extend_runtime_args
中,返回类型是一个对象 - 在评估 Jinja 模板后,返回类型更改为字符串
-
CloudDataFusionStartPipelineOperator
失败,因为runtime_args
属性是字符串而不是对象
问题:
- 我们如何在评估 Jinja 模板后返回一个对象(并在“未来”这样做)?
- 我们可以以某种方式转换字符串吗?
- 我们如何确保此处的逻辑会在 DAG 执行后执行,而不是在 DAG 生成后立即执行?
- 这里用于处理触发时间参数的 Jinja 模板/自定义宏是好的还是坏的模式?
解决方法
我们如何在评估 Jinja 模板后返回一个对象 (并在“未来”这样做)?
您可以创建自己的自定义运算符,派生自 CloudDataFusionStartPipelineOperator
并使其接受字符串并将其转换为 CloudDataFusionStartPipelineOperator
所需的对象并使用此新运算符。 “runtime_args”是一个字典,所以我相信它应该像json.loads()
一样容易恢复。
我们可以以某种方式转换字符串吗?
是的。只是上面的 json.loads() 代码应该做。此外,如果您在 runtime_args 中只有几个参数可以更改,那么拥有多个宏并直接在字典中的多个 JINJA 字符串中返回更改的值可能会更容易。类似的东西:
runtime_args = {
'PREFIX' = "{{ dag_run }}",'date' = "{{ macros.ds_format(....) }}",}
当存在模板化字段时,Airflow 会递归处理字典或列表等基本结构,因此您可以保留对象结构,并将 jinja 宏用作值(实际上您也可以将 jinja 宏用作键等)。
我们如何保证这里的逻辑会在 DAG 被执行后被执行? 执行而不是在 DAG 生成后立即执行?
JINJA 模板仅在执行任务时进行评估。所以你在这里很好。
这里的 Jinja 模板/自定义宏是好的还是坏的模式? 处理触发时间参数?
很好的模式。这就是他们的目的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。