微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为任务添加延迟,直到将特定文件从存储桶中移出

如何解决为任务添加延迟,直到将特定文件从存储桶中移出

我是 Airflow 的新手。我必须检查从 DAG _pickImageState 生成文件是否从存储桶中移动(在我的情况下,我生成文件将在被其他系统拾取时从存储桶中移走,然后不会是存储桶中的这个输出文件。从存储桶中删除文件可能需要几分钟的时间)

如何在同一个 DAG 中添加一个任务,它等待/退出直到文件从存储桶中移开,当 sample.txt 文件移开时,然后继续下一个任务。

是否有满足上述条件的运营商?请说明如何继续

解决方法

您可以根据当前的 GCSObjectExistenceSensor

创建自定义传感器

修改很简单:

from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
class GCSObjectNotExistenceSensor(GCSObjectExistenceSensor):

    def poke(self,context: dict) -> bool:
        self.log.info('Sensor checks if : %s,%s does not exist',self.bucket,self.object)
        hook = GCSHook(
            gcp_conn_id=self.google_cloud_conn_id,delegate_to=self.delegate_to,impersonation_chain=self.impersonation_chain,)
        return not hook.exists(self.bucket,self.object)
    

然后在您的代码中使用传感器 GCSObjectNotExistenceSensor,例如:

gcs_object_does_not_exists = GCSObjectNotExistenceSensor(
    bucket=BUCKET_1,object=PATH_TO__FILE,mode='poke',task_id="gcs_object_does_not_exists_task",)

在移除对象 PATH_TO__FILE 之前,传感器不会让管道继续运行。

,

您可以使用airflow PythonOperator 来完成任务。使 Python 可调用不断戳 GCS 并检查文件是否被删除。从 GCS 中删除文件时从 Python 函数返回。

from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import google.auth

def check_file_in_gcs():
    credentials,project = google.auth.default()
    storage_client = storage.Client('your_Project_id',credentials=credentials)
    name = 'sample.txt'   
    bucket_name = 'Your_Bucket_name'
    bucket = storage_client.bucket(bucket_name)
    while True:
        stats = storage.Blob(bucket=bucket,name=name).exists(storage_client)
        if not stats:
           print("Returning as file is removed!!!!")
           return

check_gcs_file_removal = PythonOperator(
            task_id='check_gcs_file_removal',python_callable= check_file_in_gcs,#op_kwargs={'params': xyz},#Pass bucket name and other details if needed by commentating above 
            dag=dag
        )

您可能需要安装 Python 包才能让 Google Cloud 库正常工作。请从下面安装一个。 (不确定要安装哪一个。取自我的 virtualenv)

google-api-core==1.16.0
google-api-python-client==1.8.0
google-auth==1.12.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-cloud-core==1.3.0
google-cloud-storage==1.27.0

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。