Airflow2 使用 AWS RDS Postgres 作为 Metastore 错误

如何解决Airflow2 使用 AWS RDS Postgres 作为 Metastore 错误

开发人员您好,我一直在努力弄清楚如何利用 AWS RDS 作为气流元存储

我在浏览了几个帖子和博客后创建了这个问题

参考资料:

我尝试使用 Postgres 版本 9.6.17-R1 和版本 12

这是我正在使用的 Docker 组合文件

version: '3'

x-airflow-common:
  &airflow-common
  image: apache/airflow:2.0.1
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__sql_ALCHEMY_CONN: 'postgresql+psycopg2://<USERNAME>:<PASSWORD>@XXXX:5432/airflow-metastore'
#    AIRFLOW__CELERY__RESULT_BACKEND: 'db+postgresql://<USERNAME>:<PASSWORD>@XXXX:5432/airflow-metastore'
    AIRFLOW__CELERY__broKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
#    POSTGRES_USER : XX
#    POSTGRES_PASSWORD : XX
#    POSTGRES_HOST : XXX
#    export POSTGRES_PORT : XX
#    POSTGRES_DB: airflow-metastore
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  command: pip3 install  -r requirements.txt
  depends_on:
    redis:
      condition: service_healthy
#    postgres:
#      condition: service_healthy

services:

#  postgres:
#    image: postgres:13
#    environment:
#      POSTGRES_USER: airflow
#      POSTGRES_PASSWORD: airflow
#      POSTGRES_DB: airflow
#    healthcheck:
#      test: ["CMD","pg_isready","-U","airflow"]
#      interval: 5s
#      retries: 5
#    restart: always

  redis:
    image: redis:latest
    ports:
      - 6379:6379
    healthcheck:
      test: ["CMD","redis-cli","ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD","curl","--fail","http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  flower:
    <<: *airflow-common
    command: celery flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD","http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always


这是我试过的

docker-compose up 气流初始化 抛出错误

 [2021-03-21 14:22:24,386] {db.py:674} INFO - Creating tables
airflow-init_1       | Traceback (most recent call last):
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",line 2336,in _wrap_pool_connect
airflow-init_1       |     return fn()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 304,in unique_connection
airflow-init_1       |     return _ConnectionFairy._checkout(self)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 778,in _checkout
airflow-init_1       |     fairy = _ConnectionRecord.checkout(pool)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 495,in checkout
airflow-init_1       |     rec = pool._do_get()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py",line 140,in _do_get
airflow-init_1       |     self._dec_overflow()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py",line 70,in __exit__
airflow-init_1       |     with_traceback=exc_tb,airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",line 182,in raise_
airflow-init_1       |     raise exception
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/impl.py",line 137,in _do_get
airflow-init_1       |     return self._create_connection()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 309,in _create_connection
airflow-init_1       |     return _ConnectionRecord(self)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 440,in __init__
airflow-init_1       |     self.__connect(first_connect_check=True)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 661,in __connect
airflow-init_1       |     pool.logger.debug("Error on connect(): %s",e)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/langhelpers.py",in raise_
airflow-init_1       |     raise exception
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py",line 656,in __connect
airflow-init_1       |     connection = pool._invoke_creator(self)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py",line 114,in connect
airflow-init_1       |     return dialect.connect(*cargs,**cparams)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py",line 508,in connect
airflow-init_1       |     return self.dbapi.connect(*cargs,**cparams)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/psycopg2/__init__.py",line 127,in connect
airflow-init_1       |     conn = _connect(dsn,connection_factory=connection_factory,**kwasync)
airflow-init_1       | psycopg2.OperationalError: FATAL:  database "airflow-metastore" does not exist
airflow-init_1       |
airflow-init_1       |
airflow-init_1       | The above exception was the direct cause of the following exception:
airflow-init_1       |
airflow-init_1       | Traceback (most recent call last):
airflow-init_1       |   File "/home/airflow/.local/bin/airflow",line 8,in <module>
airflow-init_1       |     sys.exit(main())
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py",line 40,in main
airflow-init_1       |     args.func(args)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py",line 48,in command
airflow-init_1       |     return func(*args,**kwargs)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py",line 89,in wrapper
airflow-init_1       |     return f(*args,**kwargs)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/db_command.py",in upgradedb
airflow-init_1       |     db.upgradedb()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/db.py",line 684,in upgradedb
airflow-init_1       |     command.upgrade(config,'heads')
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/alembic/command.py",line 294,in upgrade
airflow-init_1       |     script.run_env()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/alembic/script/base.py",line 481,in run_env
airflow-init_1       |     util.load_python_file(self.dir,"env.py")
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/alembic/util/pyfiles.py",line 97,in load_python_file
airflow-init_1       |     module = load_module_py(module_id,path)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/alembic/util/compat.py",in load_module_py
airflow-init_1       |     spec.loader.exec_module(module)
airflow-init_1       |   File "<frozen importlib._bootstrap_external>",line 678,in exec_module
airflow-init_1       |   File "<frozen importlib._bootstrap>",line 219,in _call_with_frames_removed
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/migrations/env.py",line 108,in <module>
airflow-init_1       |     run_migrations_online()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/airflow/migrations/env.py",line 91,in run_migrations_online
airflow-init_1       |     with connectable.connect() as connection:
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",line 2263,in connect
airflow-init_1       |     return self._connection_cls(self,**kwargs)
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",line 104,in __init__
airflow-init_1       |     else engine.raw_connection()
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",line 2370,in raw_connection
airflow-init_1       |     self.pool.unique_connection,_connection
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",line 2340,in _wrap_pool_connect
airflow-init_1       |     e,dialect,self
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",line 1584,in _handle_dbapi_exception_noconnection
airflow-init_1       |     sqlalchemy_exception,with_traceback=exc_info[2],from_=e
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/util/compat.py",in raise_
airflow-init_1       |     raise exception
airflow-init_1       |   File "/home/airflow/.local/lib/python3.6/site-packages/sqlalchemy/engine/base.py",**kwasync)
airflow-init_1       | sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL:  database "airflow-metastore" does not exist
airflow-init_1       |
airflow-init_1       | (Background on this error at: http://sqlalche.me/e/13/e3q8)
airflow-init_1       | usage: airflow [-h] GROUP_OR_COMMAND ...


nit_1       | positional arguments:
airflow-init_1       |   GROUP_OR_COMMAND
airflow-init_1       |
airflow-init_1       |     Groups:
airflow-init_1       |       celery         Celery components
airflow-init_1       |       config         View configuration
airflow-init_1       |       connections    Manage connections
airflow-init_1       |       dags           Manage DAGs
airflow-init_1       |       db             Database operations
airflow-init_1       |       kubernetes     Tools to help run the KubernetesExecutor
airflow-init_1       |       pools          Manage pools
airflow-init_1       |       providers      display providers
airflow-init_1       |       roles          Manage roles
airflow-init_1       |       tasks          Manage tasks
airflow-init_1       |       users          Manage users
airflow-init_1       |       variables      Manage variables
airflow-init_1       |
airflow-init_1       |     Commands:
airflow-init_1       |       cheat-sheet    display cheat sheet
airflow-init_1       |       info           Show information about current Airflow and environment
airflow-init_1       |       kerberos       Start a kerberos ticket renewer
airflow-init_1       |       plugins        Dump information about loaded plugins
airflow-init_1       |       rotate-fernet-key
airflow-init_1       |                      Rotate encrypted connection credentials and variables
airflow-init_1       |       scheduler      Start a scheduler instance
airflow-init_1       |       sync-perm      Update permissions for existing roles and DAGs
airflow-init_1       |       version        Show the version
airflow-init_1       |       webserver      Start a Airflow webserver instance
airflow-init_1       |
airflow-init_1       | optional arguments:
airflow-init_1       |   -h,--help         show this help message and exit
airflow-init_1       |
airflow-init_1       | airflow command error: argument GROUP_OR_COMMAND: `airflow upgradedb` command,has been removed,please use `airflow db upgrade`,see help above.
airflow-init_1       | 2.0.1


我尝试了很多搜索,但找不到解决方案,任何帮助都会很棒

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?