微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法在带有气流的 jinja 模板中使用 python 变量

如何解决无法在带有气流的 jinja 模板中使用 python 变量

我正在尝试使用 Airflow 在 AWS EMR 上运行 11 步并遵循此 code 作为参考。由于使用 EmrAddStepsOperator 和 EmrStepSensor 进行 11 个步骤会重复太多。所以我试图遍历它。我在 DAG 中使用了以下代码

step_adder = list()
step_checker = list()
steps = ['step1','step2','step3','step4','step5','step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context,value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow',key='return_value') }}",aws_conn_id='aws_default',steps=eval('step_'+str(i+1)),))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',job_flow_id="{{ task_instance.xcom_pull('create_job_flow',#step_id="{{"task_instance.xcom_pull(task_ids={},key='return_value')[0]",steps[i]}}",step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step,key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',))

在这里遇到一个错误,EmrStepSensor 期望 EMR 中的 step_id 在这里输入,并且是从 xcom 获取生成的(我猜,我不是 100% 确定此代码是如何工作的)。但是我的步骤存储在步骤列表中,所以我不能在 step_id 的 task_id 中给出静态值,就像在参考代码中给出的一样,我无法弄清楚如何使用带有 python 变量值的 jinja 模板来放置值从步骤列表中。

我使用了以下两种方式,以便 step_id 可以根据步骤 [i] 中的步骤名称从 EMR 中获取正确的步骤

step_id="{{"task_instance.xcom_pull(task_ids={},key='return_value')[0] }}")

然而,这两个都失败了,在 Airflow 中出现了语法错误。因此,如果有人能指出我正确的方向来做到这一点,我将不胜感激。我使用的是 Airflow 1.10.12(这是 AWS 上 Managed Apache Airflow 中 Airflow 的认版本)。

解决方法

我不确定这是否已经解决,所以:

使用 f 字符串:

f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}',key='return_value')[0] }}}}"

使用 .format"{{{{ task_instance.xcom_pull(task_ids='{}',key='return_value')[0] }}}}".format(steps[i])

请注意,您必须确保键 task_ids 的值用单引号括起来。此外,从 xcom_pull 返回的是一个列表,因此末尾的索引 [0] o

,

你可以这样做:

step_id= " {{{{ task_instance.xcom_pull(task_ids={},key='return_value') }}}} ".format(steps[i])

或使用f-string

step_id= f" {{{{ task_instance.xcom_pull(task_ids={steps[i]},key='return_value') }}}} "

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。