如何解决Cloud Composer Airflow Dynamic DAG中的“ Dag似乎丢失”错误
我在Google Cloud Composer中创建了一个动态Airflow DAG,并在网络服务器中列出并运行(回填),没有错误。 但是,存在一些问题:
- 当点击网址中的DAG时,它说“ DAG似乎是 失踪”
- 看不到图形视图/树形视图,显示上面的错误
- 无法像上面显示的错误一样手动触发DAG
尝试修复此问题几天...任何提示都会有所帮助。谢谢!
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from google.cloud import storage
from airflow.models import Variable
import json
args = {
'owner': 'xxx','start_date':'2020-11-5','provide_context': True
}
dag = DAG(
dag_id='dynamic',default_args=args
)
def return_bucket_files(bucket_name='xxxxx',**kwargs):
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blobs = bucket.list_blobs()
file_list = [blob.name for blob in blobs]
return file_list
def dynamic_gcs_to_gbq_etl(file,**kwargs):
mapping = json.loads(Variable.get("xxxxx"))
database = mapping[0][file]
table = mapping[1][file]
task=GoogleCloudStorageToBigQueryOperator(
task_id= f'gcs_load_{file}_to_gbq',bucket='xxxxxxx',source_objects=[f'{file}'],destination_project_dataset_table=f'xxx.{database}.{table}',write_disposition="WRITE_TRUNCATE",autodetect=True,skip_leading_rows=1,source_format='CSV',dag=dag)
return task
start_task = DummyOperator(
task_id='start',dag=dag
)
end_task = DummyOperator(
task_id='end',dag=dag)
push_bucket_files = PythonOperator(
task_id="return_bucket_files",provide_context=True,python_callable=return_bucket_files,dag=dag)
for file in return_bucket_files():
gcs_load_task = dynamic_gcs_to_gbq_etl(file)
start_task >> push_bucket_files >> gcs_load_task >> end_task
解决方法
此问题意味着Web服务器无法侧放DAG包-此问题很可能不是您的DAG特有的。
我的建议是立即尝试重新启动Web服务器(通过installation of some dummy package)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。