如何解决气流导入问题
我们已经配置了 Airflow,并且有两个气流 dag 工作正常。我创建了一个新的 dag,可以在 Airflow UI 中看到相同的内容。但是当我尝试从 UI 打开 Airflow Dag 时,它给了我以下错误
“DAG
- 首先,我尝试运行我的管道 py 文件以查看它是否有任何问题:
python sample_dag_pipeline.py
['/home/airflow/airflow-install/dags/ABC_Middleware','/home/airflow/airflow-install/dags','/home/airflow/airflow-install/sample_folder/bin/python','/usr/lib64/python36.zip','/usr/lib64/python3.6','/usr/lib64/python3.6/lib-dynload','/home/airflow/airflow-install/sample_folder/lib64/python3.6/site-packages','/home/airflow/airflow-install/sample_folder/lib/python3.6/site-packages']
似乎工作正常。
- 然后我尝试运行:airflow list_dags 它给了我以下错误:
(env) [airflow@Airflow-VM dags]$ airflow list_dags
[2021-02-02 08:41:34,503] {__init__.py:50} INFO - Using executor LocalExecutor
[2021-02-02 08:41:34,507] {dagbag.py:417} INFO - Filling up the DagBag from /home/airflow/airflow-install/dags
['/home/airflow/airflow-install/sample_folder/bin','/home/airflow/airflow-install/dags/AirflowMiddleware','/home/airflow/airflow-install/sample_folder/lib/python3.6/site-packages','/home/airflow/airflow-install/config','/home/airflow/airflow-install/plugins']
[2021-02-02 08:41:35,044] {dagbag.py:259} ERROR - Failed to import: /home/airflow/airflow-install/dags/sample_dag_pipeline.py
Traceback (most recent call last):
File "/home/airflow/airflow-install/sample_folder/lib64/python3.6/site-packages/airflow/models/dagbag.py",line 256,in process_file
m = imp.load_source(mod_name,filepath)
File "/usr/lib64/python3.6/imp.py",line 172,in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>",line 684,in _load
File "<frozen importlib._bootstrap>",line 665,in _load_unlocked
File "<frozen importlib._bootstrap_external>",line 678,in exec_module
File "<frozen importlib._bootstrap>",line 219,in _call_with_frames_removed
File "/home/airflow/airflow-install/dags/sample_dag_pipeline.py",line 8,in <module>
from abc.def.gef import sample_dag_jobs
ImportError: cannot import name 'sample_dag_jobs'
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ABC_V01
DEF_V01
让我看看我的sample_dag_pipeline.py文件:
(env) [airflow@Airflow-VM dags]$ cat sample_dag_pipeline.py | head -20
import sys
import traceback
from importlib import reload
sys.path.insert(0,"/home/airflow/airflow-install/dags/ABC_Middleware")
from datetime import datetime
from abc.def.gef import sample_dag_jobs
dagutils = reload(cai_jobs_pipeline)
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import Pythonoperator
from airflow.utils.trigger_rule import TriggerRule
DAG_NAME = "ABC_DataPipeline_V1.0"
观察:
因此,在上面的 sample_dag_pipeline.py 文件中,我指的是其他目录(ABC_Middleware)中存在的代码,首先将其添加到 sys 路径中,然后从中导入模块。 我对我的另外两个 dag(ABC_V01,DEF_V01)做了同样的事情,它们工作正常,但对于这个却不是。
当我运行 py 文件时,它会向我打印 sys 路径,我可以看到添加到 sys.('/home/airflow/airflow-install/dags/ABC_Middleware') 中的路径
但是当我在 sys 路径中运行:airflow list_dags 时,我看不到我的目录的 sys 路径 ('/home/airflow/airflow-install/dags/ABC_Middleware')
我尝试了很多选择,但似乎都只在这里卡住了。任何人都可以提出可能发生的事情吗?
解决方法
这可能是一个问题,因为您使用的是 abc
作为包,这与 built-in stdlib module of the same name 发生冲突,并且由于它是在加载 DAG 文件之前由 Airflow 导入的,因此 Python 是在系统路径下查找更多模块。
尝试将 abc
重命名为其他名称。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。