Airflow 2 - ModuleNotFoundError:没有名为“airflow.operators.text_processing_plugin”的模块

如何解决Airflow 2 - ModuleNotFoundError:没有名为“airflow.operators.text_processing_plugin”的模块

我是 airflow 的新手,正在尝试制作用于处理文本的 dag。我有一个由文本处理任务组成的数据管道 - 阅读文档、清理文本和将数据加载到 JSON 文件。对于文本处理,每个转换任务都使用自定义运算符,它们保存在 text_processing_plugin 文件夹中。 plugin 文件夹的完整文件夹结构是:-

├── airflow.cfg
├── airflow.db
├── airflow-webserver.pid
├── dags
│   ├── d0.py
├── plugins
│   └── text_processing_plugin
│       ├── __init__.py
│       ├── operators
│       │   ├── dependency_parsing.py
│       │   ├── entity_detection.py
│       │   ├── __init__.py
│       │   ├── lemmatize.py
│       │   ├── pos_tagging.py
│       │   ├── remove_stop_words.py
│       │   └── tokenize_doc.pyfolder structure of plugin folder is:-
├── requirements.txt
├── unittests.cfg

其中 text_processing_plugin/__init__.py 具有以下代码:-

from airflow.plugins_manager import AirflowPlugin
from text_processing_plugin.operators.dependency_parsing import DependencyParsingOperator 
from text_processing_plugin.operators.entity_detection import DetectEntityOperator
from text_processing_plugin.operators.lemmatize import LemmatizeOperator
from text_processing_plugin.operators.pos_tagging import POSTagOperator
from text_processing_plugin.operators.remove_stop_words import RemoveStopWordsOperator
from text_processing_plugin.operators.tokenize_doc import DocTokenizerOperator

class TextProcessingPlugin(AirflowPlugin):
    name = "text_processing_plugin"
    operators = [DependencyParsingOperator,DetectEntityOperator,LemmatizeOperator,POSTagOperator,RemoveStopWordsOperator,DocTokenizerOperator]
    sensors = []
    hooks = []
    executors = []
    macros = []
    admin_views = []
    flask_blueprints = []
    menu_links = []
    appbuilder_views = []
    appbuilder_menu_items = []
    global_operator_extra_links = []
    operator_extra_links = []

为了制作 DAG,airflow 1.x 就像下面使用的范式:-

import os
import json
import spacy
from airflow import DAG
from airflow.operators.python import Pythonoperator
from datetime import datetime,timedelta

from airflow.operators.text_processing_plugin import DependencyParsingOperator,DocTokenizerOperator

sp = spacy.load('en_core_web_sm')

default_args = {
    'owner': 'episource','depends_on_past': True,'start_date': datetime.datetime(2021,3,30),'retries': 0,'schedule_interval':'@once',}

dag = DAG(
    'text_processing_dag',description='Text Processing Dag',default_args=default_args,catchup=False,tags=['text_processing'])

def read_doc(**kwargs):
    file_path = os.path.join(os.getcwd(),'/data/1.txt')
    doc = open(file_path).read()
    return doc

def write_to_json(**kwargs):
    ti = kwargs['ti']
    with open(os.path.join(os.getcwd,'output','1.json'),'a+') as file:
        result_1 = ti.xcom_pull(task_ids = 'tokenize_doc')
        result_2 = ti.xcom_pull(task_ids = 'detect_entity')
        print('result 1 is ',result_1)
        print('result 2 is ',result_2)
        file.write(json.dumps(result_1))
        file.write(json.dumps(result_2))


extract = Pythonoperator(
    task_id = 'extract',python_callable = read_doc,dag = dag)

t11_tokenize_doc = DocTokenizerOperator(
    sp = sp,task_id = "transform_tokenize_doc",dag = dag,name = "Sentence Tokenizing",pool='t1',task_concurrency=2)

t12_detect_entities = DetectEntityOperator(
    sp = sp,task_id = "transform_detect_entity",name = "Entity Detection",task_concurrency=2)

load = Pythonoperator(
    task_id = 'load',python_callable = write_to_json,dag = dag)

extract >> [t11_tokenize_doc,t12_detect_entities] >> load

当我尝试运行代码时,我得到:-

Traceback (most recent call last):
  File "dags/d0.py",line 8,in <module>
    from airflow.operators.text_processing_plugin import DependencyParsingOperator,DocTokenizerOperator
ModuleNotFoundError: No module named 'airflow.operators.text_processing_plugin'

我在 Stackoverflow 上引用了一些现有答案,但无法解决错误。希望得到一些提示

解决方法

2.0 版中更改:导入操作符、传感器、钩子 插件通过 airflow.{operators,sensors,hooks}. is no longer supported,这些扩展应该作为 常规 python 模块。有关详细信息,请参阅:Modules ManagementCreating a custom Operator

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?