如何解决如何同步运行 Airflow 任务
- Pythonoperator --> 它在 AWS Athena 上运行查询并将生成的文件存储在特定的 s3 路径上
- BashOperator --> 增加用于跟踪的气流变量
- BashOperator --> 它获取 task1 的输出(响应)并在其上运行一些代码。
这里发生的是,即使 Athena 查询步骤正在运行,气流也会在几秒钟内完成。
我想确保在生成文件后应该运行进一步的步骤。基本上我希望这是同步的。
解决方法
您可以将任务设置为:
def athena_task():
# Add your code
return
t1 = PythonOperator(
task_id='athena_task',python_callable=athena_task,)
t2 = BashOperator(
task_id='variable_task',bash_command='',#replace with relevant command
)
t3 = BashOperator(
task_id='process_task',#replace with relevant command
)
t1 >> t2 >> t3
t2 仅在 t1 成功完成后才会运行,t3 仅在 t2 成功完成后才会启动。
请注意,Airflow 具有 AWSAthenaOperator,这可能会省去您自己编写代码的麻烦。操作员向 Athena 提交查询,并通过设置 output_location
参数将输出保存在 S3 路径中:
run_query = AWSAthenaOperator(
task_id='athena_task',query='SELECT * FROM my_table',output_location='s3://some-bucket/some-path/',database='my_database'
)
,
Athena 的查询 API 是异步的。您开始查询,取回 ID,然后您需要使用 GetQueryExecution
API 调用进行轮询,直到查询完成。
如果您只在第一个任务中开始查询,则无法保证在下一个任务运行时查询已完成。只有当 GetQueryExecution
返回 SUCCEEDED
(或 FAILED
/CANCELLED
)状态时,您才能期望输出文件存在。
正如@Elad 指出的那样,AWSAthenaOperator
会为您执行此操作,并处理错误情况等。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。