微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

由于调用了 Google Cloud Storage,Google Cloud Composer 中的 Airflow DAG“似乎缺失”

如何解决由于调用了 Google Cloud Storage,Google Cloud Composer 中的 Airflow DAG“似乎缺失”

以前的问题

此问题已在 hereherehere 之前报告过,但是,我怀疑这可能是因为调用了 google 云存储。

前提/问题

以下代码位于 Google Cloud Composer 实例的 DAG 文件夹中。

以下代码块基于字符串列表“动态”生成 DAG,并且将成功运行生成名称为“new_dummy_ohhel”和“{{”的 2 个 DAG 1}}”。这些 DAG 可以访问并且可以使用。

new_dummy_hello

现在,有问题且奇怪的是,当对 Google Cloud Storage 的简单调用替换字符串列表时,仍然会创建 DAG,但在尝试访问它们时会产生错误。如果将变量 import datetime as dt from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from google.cloud.storage import Client as Client_Storage list_strings = ["ohhello","hellothere"] # ^ Variable of importance for string_val in list_strings: name_dag = f"new_dummy_{string_val[:5]}" # Just get the first 5 characters (this is important later) dag = DAG( name_dag,default_args={ "owner": "airflow","start_date": dt.datetime.Now() - dt.timedelta(days=1),# ^ yesterday "depends_on_past": False,"email": [""],"email_on_failure": False,"email_on_retry": False,"retries": 1,"retry_delay": dt.timedelta(minutes=5),},schedule_interval=None,) with dag: op_dummy_start = DummyOperator(task_id="new-dummy-task-start") op_dummy_end = DummyOperator(task_id="new-dummy-task-end") op_dummy_start >> op_dummy_end globals()[name_dag] = dag 替换为以下内容,则会发生此错误

list_strings

假设我在存储桶“some_bucket_name”中有文件“something.json”和“omgwhy.json”,那么将创建两个不可访问且不可执行的 DAG:“list_strings = [ x.name for x in Client_Storage().bucket("some_bucket_name").list_blobs() ] ”和“{{1” }}”。 (仅获得前五 (5) 个字母,因此不包括 new_dummy_somet。)这表明对存储的调用成功,但 DAG 仍然“似乎丢失”。

即使代码立即像下面这样覆盖了该列表,DAG“似乎丢失”的错误仍然会出现(注意 DAG 将是“new_dummy_omgwh”和“.” ):

new_dummy_ohhel

tl;博士与假设

当对 Google Cloud Storage 进行任何调用包括成功和未使用的调用)时,文件中创建的所有 DAG 都会成功显示,但会表明 new_dummy_hello >成功调用会导致此问题。

尝试过的解决方

  • 尝试重新启动整个 Google Cloud Composer 实例
  • 尝试通过添加环境变量重新启动 Airflow 网络服务器

解决方法

Cloud Composer 中的托管 Web 服务器在与您环境的主要工作机器(使用在环境创建期间指定的服务帐户)不同的服务帐户下运行。这意味着,如果您打算从环境主存储桶以外的存储桶中读取数据,则需要使用类似的 ACL,否则 Web 服务器将无法从存储桶中读取数据。

在您的情况下,Airflow 调度程序可能会读取存储桶(使用环境的服务帐户),但 Web 服务器不能。调度程序将为 DAG 创建一个条目,但如果 Web 服务器无法在没有遇到异常的情况下解析定义文件,您将收到“DAG x is missing”。

如果您觉得上述情况正确,您可以通过调整 Cloud Storage 存储分区 ACL 或启用 DAG serialization 来解决此问题。序列化消除了 Web 服务器解析/执行定义文件的需要,并将其全部交给调度程序,因此它也可以解决您的问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。