如何解决气流 docker swarm
我目前正在通过 docker swarm 为 AWS EC2 实例上的 Apache Airflow 设置远程工作人员。
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
BACKEND=postgresql+psycopg2
DB_HOST=postgres
DB_PORT=5432
/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:813: DeprecationWarning: Specifying both AIRFLOW_HOME environment variable and airflow_home in the config file is deprecated. Please use only the AIRFLOW_HOME environment variable and remove the config file entry.
warnings.warn(msg,category=DeprecationWarning)
Starting flask
* Serving Flask app "airflow.utils.serve_logs" (lazy loading)
* Environment: production
WARNING: This is a development server. Do not use it in a production deployment.
Use a production Wsgi server instead.
* Debug mode: off
[2021-05-26 08:37:48,027] {_internal.py:113} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
/home/airflow/.local/lib/python3.7/site-packages/celery/platforms.py:801 RuntimeWarning: You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=1000 euid=1000 gid=0 egid=0
[2021-05-26 08:37:49,557: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-05-26 08:37:49,567: INFO/MainProcess] mingle: searching for neighbors
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync with 3 nodes
[2021-05-26 08:37:50,587: INFO/MainProcess] mingle: sync complete
[2021-05-26 08:37:50,601: INFO/MainProcess] celery@fcd56490a11f ready.
[2021-05-26 08:37:55,296: INFO/MainProcess] Events of group {task} enabled by remote.
worker: Warm shutdown (MainProcess)
-------------- celery@fcd56490a11f v4.4.7 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1045-aws-x86_64-with-debian-10.8 2021-05-26 08:37:48
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: airflow.executors.celery_executor:0x7f951e9d3fd0
- ** ---------- .> transport: redis://redis:6379/0
- ** ---------- .> results: postgresql://airflow:**@postgres/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default exchange=default(direct) key=default
[tasks]
. airflow.executors.celery_executor.execute_command
在我的管理器节点上的 docker 堆栈中运行的所有 docker 服务都运行良好,远程节点上的 selenium 服务也运行良好。遵循适用于 Airflow 的 docker compose setup here,我开发了如下所示的 docker compose 文件。
Postgres、Redis 和 Selenium 是标准镜像。
对于气流服务,有两个图像:
docker-compose.yaml:
version: '3.7'
services:
postgres:
image: postgres:13
env_file:
- ./config/postgres_test.env
ports:
- 5432:5432
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD","pg_isready","-d","postgres","-U","airflow"]
interval: 5s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
redis:
image: redis:latest
env_file:
- ./config/postgres_test.env
ports:
- 6379:6379
healthcheck:
test: ["CMD","redis-cli","ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
airflow-webserver:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD","curl","--fail","http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-scheduler:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: scheduler
restart: always
depends_on:
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-manager:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8794:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == manager ]
airflow-worker-remote:
image: localhost:5000/myadmin/airflow-remote
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery worker
restart: always
ports:
- 8795:8080
depends_on:
- airflow-scheduler
- airflow-webserver
- airflow-init
deploy:
placement:
constraints: [ node.role == worker ]
airflow-init:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
- ./config/init.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: version
depends_on:
- postgres
- redis
deploy:
placement:
constraints: [ node.role == manager ]
flower:
image: airflow-manager
build:
context: .
dockerfile: Dockerfile
env_file:
- ./config/airflow.env
- ./config/postgres_test.env
volumes:
- ./:/opt/airflow
user: "${AIRFLOW_UID:-1000}:${AIRFLOW_GID:-0}"
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD","http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on: []
deploy:
placement:
constraints: [ node.role == manager ]
selenium-chrome:
image: selenium/standalone-chrome:latest
ports:
- 4444:4444
deploy:
placement:
constraints: [ node.role == worker ]
depends_on: []
volumes:
postgres-db-volume:
Dockerfile:
FROM apache/airflow:2.0.1-python3.7
copY config/requirements.txt /tmp/
RUN mkdir -p /home/airflow/.cache/zeep
RUN chmod -R 777 /home/airflow/.cache/zeep
RUN chmod -R 777 /opt/airflow/
RUN mkdir -p /home/airflow/.wdm
RUN chmod -R 777 /home/airflow/.wdm
RUN pip install -r /tmp/requirements.txt
环境文件:
airflow_env:
PYTHONPATH=/opt/airflow/
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__AIRFLOW_HOME=/opt/airflow/
AIRFLOW__CORE__DAGS_FOLDER=/opt/airflow/dags
AIRFLOW__CORE__ENABLE_XCOM_PICKLING=true
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__sql_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__broKER_URL=redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY=****
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION=true
AIRFLOW__CORE__LOAD_EXAMPLES=false
AIRFLOW__CORE__PLUGINS_FOLDER=/plugins/
AIRFLOW__CORE__ParaLLELISM=48
AIRFLOW__CORE__DAG_CONCURRENCY=8
AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG=1
AIRFLOW__WEBSERVER__DAG_DEFAULT_VIEW=graph
AIRFLOW__WEBSERVER__LOG_FETCH_TIMEOUT_SEC=30
AIRFLOW__WEBSERVER__HIDE_PAUSED_DAGS_BY_DEFAULT=true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT=false
CELERY_ACKS_LATE=true
postgres_test.env:
POSTGRES_USER=airflow
POSTGRES_PASSWORD=airflow
POSTGRES_HOST=postgres
POSTGRES_PORT=5432
POSTGRES_DB=airflow
init.env:
_AIRFLOW_DB_UPGRADE=true
_AIRFLOW_WWW_USER_CREATE=true
_AIRFLOW_WWW_USER_USERNAME=${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD=${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
我看到通过设置 env CELERY_ACKS_LATE=true
解决了这个问题,但这对我来说没有帮助。
这是一个非常烦人的问题,因为它会向我的花工人监督发送垃圾邮件,我想扩展到在其他节点上运行的更多工人。
你知道这可能是什么吗?任何帮助表示赞赏!
提前致谢!
解决方法
只是想让你知道我可以通过设置来解决这个问题
CELERY_WORKER_MAX_TASKS_PER_CHILD=500
,否则默认为 50。我们的 Airflow DAG 向该工作线程发送了大约 85 个任务,因此它可能不堪重负。
显然 celery 不接受更多来自 redis 的传入消息,如果其传出消息管道已满,redis 会关闭工作线程。
找了两个人几天后,我们找到了答案。显然它仍然是一种解决方法,但它现在可以正常工作。 我在 this github issue 中找到了答案。 只是想让你知道。
如果您有进一步的见解,请随时分享。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。