如何解决Airflow 2.0.2:Dag 无法正确呈现模板
我有两个简单的任务,一个是获取 id 列表,另一个必须使用 echo 命令显示 id 列表。 xcom push 的结果似乎是正确的。我有一个元组列表,如下所示。
[(19343160,),(19350561,(19351381,(19351978,(19356674,(19356676,(19356678,(19356681,(19356682,(19359607,)]
这是我的代码:
def read_sql(file_name):
with open(sql_PATH + file_name) as f:
sql = f.read()
return sql
def query_and_push(sql):
pg_hook = PostgresHook(postgres_conn_id='redshift')
records = pg_hook.get_records(sql=sql)
return records
default_args = {
'owner': 'airflow','depends_on_past': False,'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5),}
with DAG(
'xcom_using_jinja_template',default_args=default_args,description='',schedule_interval=timedelta(days=1),start_date=days_ago(2),tags=['test'],) as dag:
t1 = Pythonoperator(
task_id='get_query_id',python_callable=query_and_push,provide_context=True,op_kwargs={
'sql' : read_sql('warmupqueryid.sql')
}
)
templated_command = dedent(
"""
{% for item in params.query_ids %}
echo {{ item[0] }};
{% endfor %}
"""
)
t2 = BashOperator(
task_id='templated',depends_on_past=False,bash_command=templated_command,params={'query_ids': " {{ ti.xcom_pull(task_ids='get_query_id'),key='return_value' }}"},)
t1 >> t2
由于这个错误,我的上一个任务失败了,我不明白为什么它没有得到 xcom push 的值。我不确定这是一个错误,还是我刚刚错过了什么。
*** Reading remote log from s3://ob-airflow-pre/logs/xcom_using_jinja_template/templated/2021-05-26T17:22:44.023533+00:00/1.log.
[2021-05-26 17:22:45,633] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:877} INFO - Dependencies all met for <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [queued]>
[2021-05-26 17:22:45,663] {taskinstance.py:1068} INFO -
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,663] {taskinstance.py:1069} INFO - Starting attempt 1 of 2
[2021-05-26 17:22:45,664] {taskinstance.py:1070} INFO -
--------------------------------------------------------------------------------
[2021-05-26 17:22:45,675] {taskinstance.py:1089} INFO - Executing <Task(BashOperator): templated> on 2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,679] {standard_task_runner.py:52} INFO - Started process 413 to run task
[2021-05-26 17:22:45,683] {standard_task_runner.py:76} INFO - Running: ['airflow','tasks','run','xcom_using_jinja_template','templated','2021-05-26T17:22:44.023533+00:00','--job-id','1811','--pool','default_pool','--raw','--subdir','DAGS_FOLDER/xcom_test.py','--cfg-path','/tmp/tmpkk2x0gyd','--error-file','/tmp/tmpc2ka7x4x']
[2021-05-26 17:22:45,683] {standard_task_runner.py:77} INFO - Job 1811: Subtask templated
[2021-05-26 17:22:45,859] {logging_mixin.py:104} INFO - Running <TaskInstance: xcom_using_jinja_template.templated 2021-05-26T17:22:44.023533+00:00 [running]> on host airflow-worker-1.airflow-worker.airflow.svc.cluster.local
[2021-05-26 17:22:45,945] {taskinstance.py:1281} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=airflow@example.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=xcom_using_jinja_template
AIRFLOW_CTX_TASK_ID=templated
AIRFLOW_CTX_EXECUTION_DATE=2021-05-26T17:22:44.023533+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-05-26T17:22:44.023533+00:00
[2021-05-26 17:22:45,946] {bash.py:135} INFO - Tmp dir root location:
/tmp
[2021-05-26 17:22:45,947] {bash.py:158} INFO - Running command:
echo ;
echo {;
echo {;
echo ;
echo t;
echo i;
echo .;
echo x;
echo c;
echo o;
echo m;
echo _;
echo p;
echo u;
echo l;
echo l;
echo (;
echo t;
echo a;
echo s;
echo k;
echo _;
echo i;
echo d;
echo s;
echo =;
echo ';
echo g;
echo e;
echo t;
echo _;
echo q;
echo u;
echo e;
echo r;
echo y;
echo _;
echo i;
echo d;
echo ';
echo );
echo,;
echo ;
echo k;
echo e;
echo y;
echo =;
echo ';
echo r;
echo e;
echo t;
echo u;
echo r;
echo n;
echo _;
echo v;
echo a;
echo l;
echo u;
echo e;
echo ';
echo ;
echo };
echo };
[2021-05-26 17:22:45,954] {bash.py:169} INFO - Output:
[2021-05-26 17:22:45,955] {bash.py:173} INFO -
[2021-05-26 17:22:45,955] {bash.py:173} INFO - {
[2021-05-26 17:22:45,955] {bash.py:173} INFO - t
[2021-05-26 17:22:45,955] {bash.py:173} INFO - i
[2021-05-26 17:22:45,955] {bash.py:173} INFO - .
[2021-05-26 17:22:45,955] {bash.py:173} INFO - x
[2021-05-26 17:22:45,955] {bash.py:173} INFO - c
[2021-05-26 17:22:45,955] {bash.py:173} INFO - o
[2021-05-26 17:22:45,955] {bash.py:173} INFO - m
[2021-05-26 17:22:45,955] {bash.py:173} INFO - _
[2021-05-26 17:22:45,955] {bash.py:173} INFO - p
[2021-05-26 17:22:45,956] {bash.py:173} INFO - u
[2021-05-26 17:22:45,956] {bash.py:173} INFO - l
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: Syntax error near unexpected token `;'
[2021-05-26 17:22:45,956] {bash.py:173} INFO - bash: -c: line 34: ` echo (;'
[2021-05-26 17:22:45,956] {bash.py:177} INFO - Command exited with return code 1
[2021-05-26 17:22:45,976] {taskinstance.py:1482} ERROR - Task Failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1138,in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context,task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1311,in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context,task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",line 1341,in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/bash.py",line 180,in execute
raise AirflowException('Bash command Failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command Failed. The command returned a non-zero exit code.
[2021-05-26 17:22:45,978] {taskinstance.py:1525} INFO - Marking task as UP_FOR_RETRY. dag_id=xcom_using_jinja_template,task_id=templated,execution_date=20210526T172244,start_date=20210526T172245,end_date=20210526T172245
[2021-05-26 17:22:46,014] {local_task_job.py:146} INFO - Task exited with return code 1
当我将 params.query_ids 更改为上面(硬编码)中的列表时,我得到了我所排除的。
templated_command = dedent(
"""
{% for item in [(19343160,)] %}
echo {{ item[0] }};
{% endfor %}
"""
)
预期结果:
[2021-05-27 10:59:05,887] {bash.py:158} INFO - Running command:
echo 19343160;
echo 19350561;
echo 19351381;
echo 19351978;
echo 19356674;
echo 19356676;
echo 19356678;
echo 19356681;
echo 19356682;
echo 19359607;
解决方法
我在天文学家论坛上回答了这个问题,但在这里也为其他人提供了答案 - 如果有帮助的话。
您将无法以 Jinja 模板的方式直接使用 bash_command
编写的 params,因为 params 不是 template_field
的 BashOperator
。但是,您可以将 get_query_id 任务中的 return_value XCom 引用为 Jinja 中的变量,如下所示:
templated_command = dedent(
"""
{% set query_ids = ti.xcom_pull(task_ids='get_query_id',key='return_value') %}
{% for item in query_ids %}
echo {{ item[0] }};
{% endfor %}
"""
)
t2 = BashOperator(
task_id='templated',depends_on_past=False,bash_command=templated_command,)
现在 templated_command 直接引用您需要的 XCom 并将其设置为 Jinja 字符串中的变量并获得您期望的输出:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。