评估 Airflow 中的模板后如何返回对象?

如何解决评估 Airflow 中的模板后如何返回对象?

我们正在设计一个变量选择和参数设置器逻辑,当 DAG 被触发时需要对其进行评估。我们的 DAG 在执行之前生成。我们决定将静态代码修改自定义宏。


直到此时,在运算符定义之间定义了一个代码,因此当 DAG 生成代码生成 DAG 时,它正在运行。此代码无法处理用于选择正确 Airflow 变量的运行时参数。

for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
    dag_id = "TableLoader_" + str(table_name)
    default_dag_args={...}
    schedule = None
    globals()[dag_id] = create_dag(dag_id,schedule,default_dag_args)
def create_dag(dag_id,default_dag_args):

    with DAG(
        default_args=default_dag_args,dag_id=dag_id,schedule_interval=schedule,user_defined_macros={ "load_type_handler": load_type_handler }
    ) as dag:

        # static python code which sets pipeline_rt_args for all generated DAGs the same way
        # this static code Could set only one type (INITIAL or INCREMENTAL)
        # but we want to decide during the execution Now

        # Operator DeFinitions
        OP_K = CloudDataFusionStartPipelineOperator(
            task_id='PREFIX_'+str(table),# ---> Can't handle runtime parameters <---
            runtime_args=pipeline_rt_args,# ...
        )

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

现在我们想在从 UI 或 REST API 触发 DAG 时传递 load_type(例如:INITIALINCREMENTAL),因此我们需要修改这个旧的(静态) 行为(仅处理一种情况,但不能同时处理两种情况)以获得正确的气流变量并为我们的 CloudDataFusionStartPipelineOperator 创建正确的对象:

例如:

{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}

但是如果我们做这样的事情:


def create_dag(dag_id,default_dag_args):

    def extend_runtime_args(prefix,param,field_name,date,job_id):

        # reading the Trigger-time parameter
        load_type = param.conf["load_type"]
        
        # getting the proper Airflow Variable (depending on current load type)
        result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]

        # setting 'job_id','dateValue','date','GCS_Input_Path' for CloudDataFusionStartPipelineOperator
        # ...

        return rt_args


    with DAG( #...
        user_defined_macros={
            "extend_runtime_args": extend_runtime_args
        }) as dag:
        # removed static code (which executes only in generation time)

        # Operator DeFinitions
        OP_K = CloudDataFusionStartPipelineOperator(
            task_id='PREFIX_'+str(table),# ---> handles runtime arguments with custom macro <---
            runtime_args="""{{ extend_runtime_args('PREFIX',dag_run,'runtime_args',macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"),ti.job_id) }}""",# ...
        )

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

注意:

这里我们需要的是自定义逻辑的“未来”评估(不在 DAG 生成时评估),它将返回一个对象,这就是我们在这里使用模板的原因。

我们遇到以下情况:

  • 自定义函数 extend_runtime_args 中,返回类型是一个对象
  • 在评估 Jinja 模板后,返回类型更改为字符串
  • CloudDataFusionStartPipelineOperator 失败,因为 runtime_args 属性是字符串而不是对象

问题:

  • 我们如何在评估 Jinja 模板后返回一个对象(并在“未来”这样做)?
    • 我们可以以某种方式转换字符串吗?
  • 我们如何确保此处的逻辑会在 DAG 执行后执行,而不是在 DAG 生成后立即执行?
  • 这里用于处理触发时间参数的 Jinja 模板/自定义宏是好的还是坏的模式?

解决方法

我们如何在评估 Jinja 模板后返回一个对象 (并在“未来”这样做)?

您可以创建自己的自定义运算符,派生自 CloudDataFusionStartPipelineOperator 并使其接受字符串并将其转换为 CloudDataFusionStartPipelineOperator 所需的对象并使用此新运算符。 “runtime_args”是一个字典,所以我相信它应该像json.loads()一样容易恢复。

我们可以以某种方式转换字符串吗?

是的。只是上面的 json.loads() 代码应该做。此外,如果您在 runtime_args 中只有几个参数可以更改,那么拥有多个宏并直接在字典中的多个 JINJA 字符串中返回更改的值可能会更容易。类似的东西:

runtime_args = {
   'PREFIX' = "{{ dag_run }}",'date' = "{{ macros.ds_format(....) }}",}

当存在模板化字段时,Airflow 会递归处理字典或列表等基本结构,因此您可以保留对象结构,并将 jinja 宏用作值(实际上您也可以将 jinja 宏用作键等)。

我们如何保证这里的逻辑会在 DAG 被执行后被执行? 执行而不是在 DAG 生成后立即执行?

JINJA 模板仅在执行任务时进行评估。所以你在这里很好。

这里的 Jinja 模板/自定义宏是好的还是坏的模式? 处理触发时间参数?

很好的模式。这就是他们的目的。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?