微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何同步运行 Airflow 任务

如何解决如何同步运行 Airflow 任务

我有一个包含 2-3 个步骤的气流

  1. Pythonoperator --> 它在 AWS Athena 上运行查询并将生成文件存储在特定的 s3 路径上
  2. BashOperator --> 增加用于跟踪的气流变量
  3. BashOperator --> 它获取 task1 的输出(响应)并在其上运行一些代码

这里发生的是,即使 Athena 查询步骤正在运行,气流也会在几秒钟内完成。

我想确保在生成文件后应该运行进一步的步骤。基本上我希望这是同步的。

解决方法

您可以将任务设置为:

def athena_task():
    # Add your code
    return

t1 = PythonOperator(
    task_id='athena_task',python_callable=athena_task,)

t2 = BashOperator(
    task_id='variable_task',bash_command='',#replace with relevant command
)

t3 = BashOperator(
    task_id='process_task',#replace with relevant command
)

t1 >> t2 >> t3

t2 仅在 t1 成功完成后才会运行,t3 仅在 t2 成功完成后才会启动。

请注意,Airflow 具有 AWSAthenaOperator,这可能会省去您自己编写代码的麻烦。操作员向 Athena 提交查询,并通过设置 output_location 参数将输出保存在 S3 路径中:

run_query = AWSAthenaOperator(
    task_id='athena_task',query='SELECT * FROM  my_table',output_location='s3://some-bucket/some-path/',database='my_database'
)
,

Athena 的查询 API 是异步的。您开始查询,取回 ID,然后您需要使用 GetQueryExecution API 调用进行轮询,直到查询完成。

如果您只在第一个任务中开始查询,则无法保证在下一个任务运行时查询已完成。只有当 GetQueryExecution 返回 SUCCEEDED(或 FAILED/CANCELLED)状态时,您才能期望输出文件存在。

正如@Elad 指出的那样,AWSAthenaOperator 会为您执行此操作,并处理错误情况等。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。