微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

不同工作节点上的 Airflow DAG 任务并行性

如何解决不同工作节点上的 Airflow DAG 任务并行性

我有一个由 3 个工作节点组成的 Airflow 集群,使用 CeleryExecutor 和 RabbitMQ 进行通信。 我的 DAG 通常由下载文件、解压缩文件上传文件到 hadoop 等任务组成。因此它们相互依赖并且必须在单个机器/节点上运行。

当气流调度单个 DAG 的这些任务时,在不同的节点,我最终会出错,因为这些任务被安排在不同的机器上,但我需要在一个 DAG 中安排所有任务单机。

我尝试在airflow.cfg和初始化dag时设置dag_concurrency = 1和max_active_runs_per_dag = 1,但没有成功。

我的气流.cfg 的其余部分:

parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16

据我所知,将 dag_concurrency 设置为 1 应该可以解决问题,但我在这里遗漏了什么?

解决方法

CeleryExecutor 支持多个 queues,您可以为每个操作符定义一个特定的队列(是 BaseOperator 的一个属性),然后为每个工作人员订阅该特定队列。请注意,worker 可以监听一个或多个队列。

来自docs

Worker 可以监听一个或多个任务队列。当一个工人 启动(使用命令airflow celery worker),一组 可以指定逗号分隔的队列名称(例如气流芹菜 工人 -q 火花)。然后,该工人将只接接连线到的任务 指定的队列

这是一个 DAG 示例:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago


default_args = {
    'owner': 'airflow','depends_on_past': False,'start_date': days_ago(1),}
dag = DAG('dist_example',schedule_interval='@once',catchup=False,default_args=default_args
          )
get_hostname = 'echo $(hostname)'

t1 = BashOperator(
    task_id='task_for_q1',bash_command=get_hostname,queue='queue_1',dag=dag
)
t2 = BashOperator(
    task_id='task_for_q2',queue='queue_2',dag=dag
)
t1 >> t2

worker_1: airflow celery worker -q default,queue_1

worker_2: airflow celery worker -q default,queue_2

通过同时侦听您的特定队列和 default(由 default_queue 配置键定义),您不会影响任何其他任务的标准多线程行为。

希望对你有用!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。