Airflow 2.0.2:Dag 无法正确呈现模板

如何解决Airflow 2.0.2:Dag 无法正确呈现模板

我有两个简单的任务,一个获取 id 列表,另一个必须使用 echo 命令显示 id 列表。 xcom push 的结果似乎是正确的。我有一个元组列表,如下所示。

返回函数(xcom push)的输出一个元组列表,如下:

[(19343160,),(19350561,(19351381,(19351978,(19356674,(19356676,(19356678,(19356681,(19356682,(19359607,)]

这是我的代码


def read_sql(file_name):
    with open(sql_PATH + file_name) as f:
        sql = f.read()

    return sql

def query_and_push(sql):
    pg_hook = PostgresHook(postgres_conn_id='redshift')
    records = pg_hook.get_records(sql=sql)
    return records

default_args = {
    'owner': 'airflow','depends_on_past': False,'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),}
with DAG(
    'xcom_using_jinja_template',default_args=default_args,description='',schedule_interval=timedelta(days=1),start_date=days_ago(2),tags=['test'],) as dag:


    t1 = Pythonoperator(
        task_id='get_query_id',python_callable=query_and_push,provide_context=True,op_kwargs={
            'sql' : read_sql('warmupqueryid.sql')
                    }
    )
    
    
    templated_command = dedent(
        """
    {% for item in params.query_ids %}
        echo {{ item[0] }};
    {% endfor %}
    """
    )


    t2 = BashOperator(
        task_id='templated',depends_on_past=False,bash_command=templated_command,params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id'),key='return_value' }}"},)

  
    t1 >> t2

由于这个错误,我的上一个任务失败了,我不明白为什么它没有得到 xcom push 的值。我不确定这是一个错误,还是我刚刚错过了什么。


*** Reading remote log from s3://ob-airflow-pre/logs/xcom_using_jinja_template/templated/2021-05-26T17:22:44.023533+00:00/1.log.
[2021-05-26 17:22:45,633] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:1068} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,663] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-26 17:22:45,664] {taskinstance.py:1070} INFO - 
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,675] {taskinstance.py:1089} INFO - Executing <Task(BashOperator): templated> on 2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,679] {standard_task_runner.py:52} INFO - Started process 413 to run task
[2021-05-26 17:22:45,683] {standard_task_runner.py:76} INFO - Running: ['airflow','tasks','run','xcom_using_jinja_template','templated','2021-05-26T17:22:44.023533+00:00','--job-id','1811','--pool','default_pool','--raw','--subdir','DAGS_FOLDER/xcom_test.py','--cfg-path','/tmp/tmpkk2x0gyd','--error-file','/tmp/tmpc2ka7x4x']
[2021-05-26 17:22:45,683] {standard_task_runner.py:77} INFO - Job 1811: Subtask templated
[2021-05-26 17:22:45,859] {logging_mixin.py:104} INFO - Running <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-26 17:22:45,945] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_using_jinja_template
AIRFLOW_CTX_TASK_ID=templated
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T17:22:44.023533+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,946] {bash.py:135} INFO - Tmp dir root location: 
 /tmp
[2021-05-26 17:22:45,947] {bash.py:158} INFO - Running command: 

    echo  ;

    echo {;

    echo {;

    echo  ;

    echo t;

    echo i;

    echo .;

    echo x;

    echo c;

    echo o;

    echo m;

    echo _;

    echo p;

    echo u;

    echo l;

    echo l;

    echo (;

    echo t;

    echo a;

    echo s;

    echo k;

    echo _;

    echo i;

    echo d;

    echo s;

    echo =;

    echo ';

    echo g;

    echo e;

    echo t;

    echo _;

    echo q;

    echo u;

    echo e;

    echo r;

    echo y;

    echo _;

    echo i;

    echo d;

    echo ';

    echo );

    echo,;

    echo  ;

    echo k;

    echo e;

    echo y;

    echo =;

    echo ';

    echo r;

    echo e;

    echo t;

    echo u;

    echo r;

    echo n;

    echo _;

    echo v;

    echo a;

    echo l;

    echo u;

    echo e;

    echo ';

    echo  ;

    echo };

    echo };

[2021-05-26 17:22:45,954] {bash.py:169} INFO - Output:
[2021-05-26 17:22:45,955] {bash.py:173} INFO - 
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO - t
[2021-05-26 17:22:45,955] {bash.py:173} INFO - i
[2021-05-26 17:22:45,955] {bash.py:173} INFO - .
[2021-05-26 17:22:45,955] {bash.py:173} INFO - x
[2021-05-26 17:22:45,955] {bash.py:173} INFO - c
[2021-05-26 17:22:45,955] {bash.py:173} INFO - o
[2021-05-26 17:22:45,955] {bash.py:173} INFO - m
[2021-05-26 17:22:45,955] {bash.py:173} INFO - _
[2021-05-26 17:22:45,955] {bash.py:173} INFO - p
[2021-05-26 17:22:45,956] {bash.py:173} INFO - u
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: Syntax error near unexpected token `;'
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: `    echo (;'
[2021-05-26 17:22:45,956] {bash.py:177} INFO - Command exited with return code 1
[2021-05-26 17:22:45,976] {taskinstance.py:1482} ERROR - Task Failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1138,in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context,task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1311,in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context,task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1341,in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py",line 180,in execute
    raise AirflowException('Bash command Failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command Failed. The command returned a non-zero exit code.
[2021-05-26 17:22:45,978] {taskinstance.py:1525} INFO - Marking task as UP_FOR_RETRY. dag_id=xcom_using_jinja_template,task_id=templated,execution_date=20210526T172244,start_date=20210526T172245,end_date=20210526T172245
[2021-05-26 17:22:46,014] {local_task_job.py:146} INFO - Task exited with return code 1

当我将 params.query_ids 更改为上面(硬编码)中的列表时,我得到了我所排除的。

 templated_command = dedent(
        """
    {% for item in [(19343160,)] %}
        echo {{ item[0] }};
    {% endfor %}
    """
    )

预期结果:

[2021-05-27 10:59:05,887] {bash.py:158} INFO - Running command: 
    
        echo 19343160;
    
        echo 19350561;
    
        echo 19351381;
    
        echo 19351978;
    
        echo 19356674;
    
        echo 19356676;
    
        echo 19356678;
    
        echo 19356681;
    
        echo 19356682;
    
        echo 19359607;

解决方法

我在天文学家论坛上回答了这个问题,但在这里也为其他人提供了答案 - 如果有帮助的话。

您将无法以 Jinja 模板的方式直接使用 bash_command 编写的 params,因为 params 不是 template_fieldBashOperator。但是,您可以将 get_query_id 任务中的 return_value XCom 引用为 Jinja 中的变量,如下所示:

templated_command = dedent(
    """
    {% set query_ids = ti.xcom_pull(task_ids='get_query_id',key='return_value') %}
    {% for item in query_ids %}
        echo {{ item[0] }};
    {% endfor %}
    """
    )


    t2 = BashOperator(
        task_id='templated',depends_on_past=False,bash_command=templated_command,)

现在 templated_command 直接引用您需要的 XCom 并将其设置为 Jinja 字符串中的变量并获得您期望的输出: image

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?