如何解决不同工作节点上的 Airflow DAG 任务并行性
我有一个由 3 个工作节点组成的 Airflow 集群,使用 CeleryExecutor 和 RabbitMQ 进行通信。 我的 DAG 通常由下载文件、解压缩文件、上传文件到 hadoop 等任务组成。因此它们相互依赖并且必须在单个机器/节点上运行。
当气流调度单个 DAG 的这些任务时,在不同的节点,我最终会出错,因为这些任务被安排在不同的机器上,但我需要在一个 DAG 中安排所有任务单机。
我尝试在airflow.cfg和初始化dag时设置dag_concurrency = 1和max_active_runs_per_dag = 1,但没有成功。
我的气流.cfg 的其余部分:
parallelism = 32
dag_concurrency = 1
worker_concurrency = 16
max_active_runs_per_dag = 16
据我所知,将 dag_concurrency 设置为 1 应该可以解决问题,但我在这里遗漏了什么?
解决方法
CeleryExecutor
支持多个 queues
,您可以为每个操作符定义一个特定的队列(是 BaseOperator
的一个属性),然后为每个工作人员订阅该特定队列。请注意,worker 可以监听一个或多个队列。
来自docs:
Worker 可以监听一个或多个任务队列。当一个工人 启动(使用命令airflow celery worker),一组 可以指定逗号分隔的队列名称(例如气流芹菜 工人 -q 火花)。然后,该工人将只接接连线到的任务 指定的队列
这是一个 DAG 示例:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow','depends_on_past': False,'start_date': days_ago(1),}
dag = DAG('dist_example',schedule_interval='@once',catchup=False,default_args=default_args
)
get_hostname = 'echo $(hostname)'
t1 = BashOperator(
task_id='task_for_q1',bash_command=get_hostname,queue='queue_1',dag=dag
)
t2 = BashOperator(
task_id='task_for_q2',queue='queue_2',dag=dag
)
t1 >> t2
worker_1:
airflow celery worker -q default,queue_1
worker_2:
airflow celery worker -q default,queue_2
通过同时侦听您的特定队列和 default
(由 default_queue
配置键定义),您不会影响任何其他任务的标准多线程行为。
希望对你有用!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。