通过文件监视程序在气流中触发dag

如何解决通过文件监视程序在气流中触发dag

我使用看门狗API遍历了这篇文章,似乎正是我所需要的: https://medium.com/@phanikumaryadavilli/hacking-apache-airflow-to-trigger-dags-based-on-filesystem-events-25f822fd08c3

代码不是我写的)

import os
import time
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunoperator
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime,timedelta


dag = DAG(dag_id="test_trigger_dag_operator",default_args={"owner":"Airflow","start_date":datetime(2020,3,9)})

trigger = TriggerDagRunoperator(
    task_id="test_trigger_dag_run_operator",trigger_dag_id="dummy_operator",conf={"message": "Hello World"},dag=dag,)

class Handler(FileSystemEventHandler):
    def on_created(self,event):
        if event.event_type == 'created':
            print("file created")
            print('Executing the dag')
            trigger

def main():
    observer = Observer()
    event_handler = FileSystemEventHandler()
    observer_path = os.getcwd()
    observer.schedule(Handler(),observer_path,recursive=False)
    observer.start()

    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()

if __name__ == '__main__':
    main()

不幸的是,使用作者代码,dag唯一要做的就是无条件地触发目标dag,并且main()永远不会被调用,即,也没有文件监视功能

我对代码进行了一些小的修改,在python_callable添加TriggerDagRunoperator属性,并在main(context,dag_run_obj)中添加了必要的args

trigger = TriggerDagRunoperator(
    task_id="test_trigger_dag_run_operator",python_callable: main,)

删除

if __name__ == '__main__':
    main()

部分。

现在文件监视程序正在运行,无论如何仍然会一次触发target-dag,并且调度程序在启动dag后立即挂起。 (这有点像while (true)所期望的那样)。如何才能以一种有效的方式使用所提供的代码

解决方法

Airflow拥有自己的服务,称为DagBag Filling,可解析您的dag并将其放入DagBag中,DagBag是您在UI和元数据数据库上都看到的dag的集合。

在对文件进行DagBag填充(解析其中的DAG)时,它实际上永无止境! 您正在DAG文件定义本身中运行该观察程序。

为避免这种情况,您需要实施Sensor

传感器-等待(轮询)特定时间的操作员,文件, 数据库行,S3键等...

,它将完全满足您的要求,但可以防止环境崩溃。 从本质上讲,这将是在自定义传感器上被覆盖的poke()方法中重新实现您的主要功能。

您可以检查contrib仓库中的任何现有传感器,或根据需要编写自定义传感器。

如果您只想从应用程序中触发dag,则可以通过REST API

向该dagid提交POST请求

两种实现方式都可以解决您的问题

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?