如何解决在气流中从谷歌存储读取 blob 对象
我正在尝试读取 google 云存储桶中的所有文件夹。 我有一个 GCS 存储桶/根文件夹路径。我正在尝试读取根文件夹中的所有子文件夹并为每个文件夹执行任务。
main.yml
它确实获取了文件夹,但是气流 UI 不会显示图形。 如果我尝试单击 dag 名称,它会说 dag_name 当前不可用。
我可以看到已完成的任务并且结果是正确的,但我无法获取 DAG。
我不知所措,不知道还能做什么。我知道当我尝试使用 folder_blobs 对象时它会失败。
请,如果有人可以帮忙。
仅供参考,这一切都可以在独立的 Python 中运行
DAG 文件应该是这样的:
from google.cloud import storage
def get_folders():
BUCKET = 'bucket-name'
PROJECT = 'project-name'
path = 'root-path'
client = storage.Client()
bucket = client.bucket(BUCKET,PROJECT)
folder_blobs = (client.list_blobs(BUCKET,prefix=path,delimiter='.'))
folder_names = []
for blob in folder_blobs:
folder_names.append(blob.name.split('/')[-2])
folder_names.sort()
return folder_names
解决方法
我尝试在我的设置中使用您的代码,我对其进行了更改,因为它在 for folder in folders:
等几行中出错(未声明文件夹对象)。在 blobs = (client.list_blobs(BUCKET,prefix=path,delimiter='.'))1
我删除了循环并将 DummyOperator 更改为 PythonOperator 以使用您的 get_folder()
函数返回 folder_names
。顺便说一下,我使用 Cloud Composer 运行 Airflow。我按照此 quick start 设置 Cloud Composer 环境。
from google.cloud import storage
from datetime import datetime #,timedelta
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import PythonOperator
from airflow.models import Variable
def get_folders():
BUCKET = 'my-test-bucket'
PROJECT = 'my-project'
path = '25868628' #just a random folder in my bucket
client = storage.Client()
bucket = client.bucket(BUCKET,PROJECT)
blobs = (client.list_blobs(BUCKET,delimiter='.'))
folder_names = []
for blob in blobs:
folder_names.append(blob.name.split('/')[-2])
folder_names.sort()
return folder_names
args = {
'owner': 'airflow',}
PARENT_DAG_NAME = 'Get_Folders_v1.02'
dag = DAG(
dag_id=PARENT_DAG_NAME,default_args=args,schedule_interval='@daily',catchup=False,start_date=datetime(2021,1,1),)
start = DummyOperator(task_id='Start',dag=dag)
end = DummyOperator(task_id='End',dag=dag)
t1 = PythonOperator(
task_id='get_folders',python_callable=get_folders,dag=dag,)
start >> t1 >> end
气流中的 DAG 文件:
输出日志:
[2021-01-07 03:40:06,448] {base_task_runner.py:113} INFO - Job 19: Subtask get_folders Running <TaskInstance: Get_Folders_v1.02.get_folders 2021-01-06T00:00:00+00:00 [running]> on host airflow-worker-c985959c8-6fv56
[2021-01-07 03:40:07,164] {base_task_runner.py:113} INFO - Job 19: Subtask get_folders [2021-01-07 03:40:07,163] {python_operator.py:114} INFO - Done. Returned value was: ['25868628','test_images']
[2021-01-07 03:40:07,267] {taskinstance.py:1066} INFO - Marking task as SUCCESS.dag_id=Get_Folders_v1.02,task_id=get_folders,execution_date=20210106T000000,start_date=20210107T033959,end_date=20210107T034007
[2021-01-07 03:40:07,267] {base_task_runner.py:113} INFO - Job 19: Subtask get_folders [2021-01-07 03:40:07,end_date=20210107T034007
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。