微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

气流导入问题

如何解决气流导入问题

我们已经配置了 Airflow,并且有两个气流 dag 工作正常。我创建了一个新的 dag,可以在 Airflow UI 中看到相同的内容。但是当我尝试从 UI 打开 Airflow Dag 时,它给了我以下错误 “DAG 似乎丢失了”。

我尝试使用以下方法解决问题:

  1. 首先,我尝试运行我的管道 py 文件以查看它是否有任何问题:
     python sample_dag_pipeline.py
['/home/airflow/airflow-install/dags/ABC_Middleware','/home/airflow/airflow-install/dags','/home/airflow/airflow-install/sample_folder/bin/python','/usr/lib64/python36.zip','/usr/lib64/python3.6','/usr/lib64/python3.6/lib-dynload','/home/airflow/airflow-install/sample_folder/lib64/python3.6/site-packages','/home/airflow/airflow-install/sample_folder/lib/python3.6/site-packages']

似乎工作正常。

  1. 然后我尝试运行:airflow list_dags 它给了我以下错误
  (env) [airflow@Airflow-VM dags]$ airflow list_dags
[2021-02-02 08:41:34,503] {__init__.py:50} INFO - Using executor LocalExecutor
[2021-02-02 08:41:34,507] {dagbag.py:417} INFO - Filling up the DagBag from /home/airflow/airflow-install/dags
['/home/airflow/airflow-install/sample_folder/bin','/home/airflow/airflow-install/dags/AirflowMiddleware','/home/airflow/airflow-install/sample_folder/lib/python3.6/site-packages','/home/airflow/airflow-install/config','/home/airflow/airflow-install/plugins']
[2021-02-02 08:41:35,044] {dagbag.py:259} ERROR - Failed to import: /home/airflow/airflow-install/dags/sample_dag_pipeline.py
Traceback (most recent call last):
 File "/home/airflow/airflow-install/sample_folder/lib64/python3.6/site-packages/airflow/models/dagbag.py",line 256,in process_file
   m = imp.load_source(mod_name,filepath)
 File "/usr/lib64/python3.6/imp.py",line 172,in load_source
   module = _load(spec)
 File "<frozen importlib._bootstrap>",line 684,in _load
 File "<frozen importlib._bootstrap>",line 665,in _load_unlocked
 File "<frozen importlib._bootstrap_external>",line 678,in exec_module
 File "<frozen importlib._bootstrap>",line 219,in _call_with_frames_removed
 File "/home/airflow/airflow-install/dags/sample_dag_pipeline.py",line 8,in <module>
   from abc.def.gef import sample_dag_jobs
ImportError: cannot import name 'sample_dag_jobs'


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
ABC_V01
DEF_V01

让我看看我的sample_dag_pipeline.py文件

(env) [airflow@Airflow-VM dags]$ cat sample_dag_pipeline.py | head -20

import sys
import traceback
from importlib import reload

sys.path.insert(0,"/home/airflow/airflow-install/dags/ABC_Middleware")

from datetime import datetime
from abc.def.gef import sample_dag_jobs

dagutils = reload(cai_jobs_pipeline)

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import Pythonoperator
from airflow.utils.trigger_rule import TriggerRule

DAG_NAME = "ABC_DataPipeline_V1.0"

观察:

因此,在上面的 sample_dag_pipeline.py 文件中,我指的是其他目录(ABC_Middleware)中存在的代码,首先将其添加到 sys 路径中,然后从中导入模块。 我对我的另外两个 dag(ABC_V01,DEF_V01)做了同样的事情,它们工作正常,但对于这个却不是。

当我运行 py 文件时,它会向我打印 sys 路径,我可以看到添加到 sys.('/home/airflow/airflow-install/dags/ABC_Middleware') 中的路径

但是当我在 sys 路径中运行:airflow list_dags 时,我看不到我的目录的 sys 路径 ('/home/airflow/airflow-install/dags/ABC_Middleware')

我尝试了很多选择,但似乎都只在这里卡住了。任何人都可以提出可能发生的事情吗?

解决方法

这可能是一个问题,因为您使用的是 abc 作为包,这与 built-in stdlib module of the same name 发生冲突,并且由于它是在加载 DAG 文件之前由 Airflow 导入的,因此 Python 是在系统路径下查找更多模块。

尝试将 abc 重命名为其他名称。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?