微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

气流 docker swarm

如何解决气流 docker swarm

我目前正在通过 docker swarm 为 AWS EC2 实例上的 Apache Airflow 设置远程工作人员。

远程工作人员每 60 秒无明显原因关闭并出现以下错误

BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432

BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432

/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
  warnings.warn(msg,category=DeprecationWarning)
Starting flask
 * Serving Flask app "airflow.utils.serve_logs" (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production Wsgi server instead.
 * Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!

Please specify a different user using the --uid option.

User information: uid=1000 euid=1000 gid=0 egid=0

[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.

worker: Warm shutdown (MainProcess)

 -------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport:   redis://redis:6379/0
- ** ---------- .> results:     postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
 -------------- [queues]
                .> default          exchange=default(direct) key=default


[tasks]
  . airflow.executors.celery_executor.execute_command

在我的管理器节点上的 docker 堆栈中运行的所有 docker 服务都运行良好,远程节点上的 selenium 服务也运行良好。遵循适用于 Airflow 的 docker compose setup here,我开发了如下所示的 docker compose 文件

Postgres、Redis 和 Selenium 是标准镜像。

对于气流服务,有两个图像:

  1. airflow-manager 只是启动容器时本地创建的镜像名称

  2. localhost:5000/myadmin/airflow-remote 是推送到本地注册表的同一个镜像,以便其他机器可以看到。

docker-compose.yaml:

version: '3.7'

services:
  postgres:
    image: postgres:13
    env_file:
      - ./config/postgres_test.env
    ports:
      - 5432:5432
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD","pg_isready","-d","postgres","-U","airflow"]
      interval: 5s
      retries: 5
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  redis:
    image: redis:latest
    env_file:
      - ./config/postgres_test.env
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD","redis-cli","ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-webserver:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD","curl","--fail","http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-scheduler:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: scheduler
    restart: always
    depends_on:
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-worker-manager:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery worker
    restart: always
    ports:
      - 8794:8080
    depends_on:
      - airflow-scheduler
      - airflow-webserver
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == manager ]

  airflow-worker-remote:
    image: localhost:5000/myadmin/airflow-remote
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery worker
    restart: always
    ports:
      - 8795:8080
    depends_on:
      - airflow-scheduler
      - airflow-webserver
      - airflow-init
    deploy:
      placement:
        constraints: [ node.role == worker ]

  airflow-init:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
      - ./config/init.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: version
    depends_on:
      - postgres
      - redis
    deploy:
      placement:
        constraints: [ node.role == manager ]

  flower:
    image: airflow-manager
    build:
      context: .
      dockerfile: Dockerfile
    env_file:
      - ./config/airflow.env
      - ./config/postgres_test.env
    volumes:
      - ./:/opt/airflow
    user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD","http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on: []
    deploy:
      placement:
        constraints: [ node.role == manager ]

  selenium-chrome:
    image: selenium/standalone-chrome:latest
    ports:
      - 4444:4444
    deploy:
      placement:
        constraints: [ node.role == worker ]
    depends_on: []


volumes:
  postgres-db-volume:


Dockerfile:


FROM apache/airflow:2.0.1-python3.7
copY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt

环境文件

airflow_env:


PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__sql_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__broKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__ParaLLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true

postgres_test.env:


POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow

init.env:

_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

我看到通过设置 env CELERY_ACKS_LATE=true 解决了这个问题,但这对我来说没有帮助。

这是一个非常烦人的问题,因为它会向我的花工人监督发送垃圾邮件,我想扩展到在其他节点上运行的更多工人。

你知道这可能是什么吗?任何帮助表示赞赏!

提前致谢!

解决方法

只是想让你知道我可以通过设置来解决这个问题 CELERY_WORKER_MAX_TASKS_PER_CHILD=500,否则默认为 50。我们的 Airflow DAG 向该工作线程发送了大约 85 个任务,因此它可能不堪重负。

显然 celery 不接受更多来自 redis 的传入消息,如果其传出消息管道已满,redis 会关闭工作线程。

找了两个人几天后,我们找到了答案。显然它仍然是一种解决方法,但它现在可以正常工作。 我在 this github issue 中找到了答案。 只是想让你知道。

如果您有进一步的见解,请随时分享。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?