如何解决Airflow 2 - ModuleNotFoundError:没有名为“airflow.operators.text_processing_plugin”的模块
我是 airflow
的新手,正在尝试制作用于处理文本的 dag。我有一个由文本处理任务组成的数据管道 - 阅读文档、清理文本和将数据加载到 JSON 文件。对于文本处理,每个转换任务都使用自定义运算符,它们保存在 text_processing_plugin
文件夹中。 plugin
文件夹的完整文件夹结构是:-
├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│ ├── d0.py
├── plugins
│ └── text_processing_plugin
│ ├── __init__.py
│ ├── operators
│ │ ├── dependency_parsing.py
│ │ ├── entity_detection.py
│ │ ├── __init__.py
│ │ ├── lemmatize.py
│ │ ├── pos_tagging.py
│ │ ├── remove_stop_words.py
│ │ └── tokenize_doc.pyfolder structure of plugin folder is:-
├── requirements.txt
├── unittests.cfg
其中 text_processing_plugin/__init__.py
具有以下代码:-
from airflow.plugins_manager import AirflowPlugin
from text_processing_plugin.operators.dependency_parsing import DependencyParsingOperator
from text_processing_plugin.operators.entity_detection import DetectEntityOperator
from text_processing_plugin.operators.lemmatize import LemmatizeOperator
from text_processing_plugin.operators.pos_tagging import POSTagOperator
from text_processing_plugin.operators.remove_stop_words import RemoveStopWordsOperator
from text_processing_plugin.operators.tokenize_doc import DocTokenizerOperator
class TextProcessingPlugin(AirflowPlugin):
name = "text_processing_plugin"
operators = [DependencyParsingOperator,DetectEntityOperator,LemmatizeOperator,POSTagOperator,RemoveStopWordsOperator,DocTokenizerOperator]
sensors = []
hooks = []
executors = []
macros = []
admin_views = []
flask_blueprints = []
menu_links = []
appbuilder_views = []
appbuilder_menu_items = []
global_operator_extra_links = []
operator_extra_links = []
为了制作 DAG,airflow 1.x
就像下面使用的范式:-
import os
import json
import spacy
from airflow import DAG
from airflow.operators.python import Pythonoperator
from datetime import datetime,timedelta
from airflow.operators.text_processing_plugin import DependencyParsingOperator,DocTokenizerOperator
sp = spacy.load('en_core_web_sm')
default_args = {
'owner': 'episource','depends_on_past': True,'start_date': datetime.datetime(2021,3,30),'retries': 0,'schedule_interval':'@once',}
dag = DAG(
'text_processing_dag',description='Text Processing Dag',default_args=default_args,catchup=False,tags=['text_processing'])
def read_doc(**kwargs):
file_path = os.path.join(os.getcwd(),'/data/1.txt')
doc = open(file_path).read()
return doc
def write_to_json(**kwargs):
ti = kwargs['ti']
with open(os.path.join(os.getcwd,'output','1.json'),'a+') as file:
result_1 = ti.xcom_pull(task_ids = 'tokenize_doc')
result_2 = ti.xcom_pull(task_ids = 'detect_entity')
print('result 1 is ',result_1)
print('result 2 is ',result_2)
file.write(json.dumps(result_1))
file.write(json.dumps(result_2))
extract = Pythonoperator(
task_id = 'extract',python_callable = read_doc,dag = dag)
t11_tokenize_doc = DocTokenizerOperator(
sp = sp,task_id = "transform_tokenize_doc",dag = dag,name = "Sentence Tokenizing",pool='t1',task_concurrency=2)
t12_detect_entities = DetectEntityOperator(
sp = sp,task_id = "transform_detect_entity",name = "Entity Detection",task_concurrency=2)
load = Pythonoperator(
task_id = 'load',python_callable = write_to_json,dag = dag)
extract >> [t11_tokenize_doc,t12_detect_entities] >> load
当我尝试运行代码时,我得到:-
Traceback (most recent call last):
File "dags/d0.py",line 8,in <module>
from airflow.operators.text_processing_plugin import DependencyParsingOperator,DocTokenizerOperator
ModuleNotFoundError: No module named 'airflow.operators.text_processing_plugin'
我在 Stackoverflow 上引用了一些现有答案,但无法解决该错误。希望得到一些提示。
解决方法
在 2.0 版中更改:导入操作符、传感器、钩子
插件通过 airflow.{operators,sensors,hooks}.
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。