如何解决为任务添加延迟,直到将特定文件从存储桶中移出
我是 Airflow 的新手。我必须检查从 DAG _pickImageState
生成的文件是否从存储桶中移动(在我的情况下,我生成的文件将在被其他系统拾取时从存储桶中移走,然后不会是存储桶中的这个输出文件。从存储桶中删除文件可能需要几分钟的时间)
如何在同一个 DAG 中添加一个任务,它等待/退出直到文件从存储桶中移开,当 sample.txt 文件移开时,然后继续下一个任务。
是否有满足上述条件的运营商?请说明如何继续
解决方法
您可以根据当前的 GCSObjectExistenceSensor
创建自定义传感器修改很简单:
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
class GCSObjectNotExistenceSensor(GCSObjectExistenceSensor):
def poke(self,context: dict) -> bool:
self.log.info('Sensor checks if : %s,%s does not exist',self.bucket,self.object)
hook = GCSHook(
gcp_conn_id=self.google_cloud_conn_id,delegate_to=self.delegate_to,impersonation_chain=self.impersonation_chain,)
return not hook.exists(self.bucket,self.object)
然后在您的代码中使用传感器 GCSObjectNotExistenceSensor
,例如:
gcs_object_does_not_exists = GCSObjectNotExistenceSensor(
bucket=BUCKET_1,object=PATH_TO__FILE,mode='poke',task_id="gcs_object_does_not_exists_task",)
在移除对象 PATH_TO__FILE
之前,传感器不会让管道继续运行。
您可以使用airflow PythonOperator 来完成任务。使 Python 可调用不断戳 GCS 并检查文件是否被删除。从 GCS 中删除文件时从 Python 函数返回。
from airflow.operators.python_operator import PythonOperator
from google.cloud import storage
import google.auth
def check_file_in_gcs():
credentials,project = google.auth.default()
storage_client = storage.Client('your_Project_id',credentials=credentials)
name = 'sample.txt'
bucket_name = 'Your_Bucket_name'
bucket = storage_client.bucket(bucket_name)
while True:
stats = storage.Blob(bucket=bucket,name=name).exists(storage_client)
if not stats:
print("Returning as file is removed!!!!")
return
check_gcs_file_removal = PythonOperator(
task_id='check_gcs_file_removal',python_callable= check_file_in_gcs,#op_kwargs={'params': xyz},#Pass bucket name and other details if needed by commentating above
dag=dag
)
您可能需要安装 Python 包才能让 Google Cloud 库正常工作。请从下面安装一个。 (不确定要安装哪一个。取自我的 virtualenv)
google-api-core==1.16.0
google-api-python-client==1.8.0
google-auth==1.12.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.1
google-cloud-core==1.3.0
google-cloud-storage==1.27.0
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。