微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在数据流入时迭代 GCP 中的存储桶内容

如何解决在数据流入时迭代 GCP 中的存储桶内容

我想在特定时间点迭代存储桶的内容我有以下代码来执行此操作:

from google.cloud import storage
client = storage.Client()
bucket = client.get_bucket("my_bucket")
blobs = bucket.list_blobs()
for blob in blobs:
    if "old_name" in blob.name:
        # process file

但是,当我执行此操作时,项目正在流入我的存储桶。 if 语句在理论上意味着我不会处理任何流入的新文件,但是我很担心迭代器不会包含所有 blob,因为流入的新文件会以某种方式弄乱它。你知道上面的代码是否可以工作,或者可能有问题,我错过了一些文件

解决方法

我想迁移存储桶中的数据。我想确保我迁移 代码启动时存储桶中的所有项目。如果我移民 开始后到达的物品还可以,但我不能错过任何东西 在它开始之前就在那里

既然如此,你就很好。这将起作用。 例外情况是,如果新数据覆盖了您想要复制的数据,然后再到达它,但如果没有发生覆盖,这就好了。

但是迭代器是如何工作的,以至于我们知道它会没问题。 使用迭代器将项目添加到列表中不会改变其行为吗?

如文档中所示,.list_blobs 返回部分“离线”的结果:它将使用 API 获取一页结果,并在迭代每个请求中的 itens 时透明地获取更多页面。也就是说:在每 N 个项目上发出一个新的阻塞 HTTP 请求,N 是默认页面大小 - 但不是在迭代器上消耗的每个项目上。如果您没有明确设置 max_results,它看起来像是使用 API 默认值。这意味着它将在整个迭代过程中发出一些请求,并且在这些请求之间添加的新文件要么出现在最后,要么根本不出现 - 如果这些新文件会破坏结果获取,这将是一个非常错误的行为。

实际上,如果操作员将作为存储桶的“实时视图”工作,每次迭代时都获取一个项目,则必须相当复杂 - 使用 API 的应用程序必须使用某种方式并行性,无论是多线程还是异步,其中 API 代码将能够运行和检索网络数据同时您的代码也在运行。这需要代码一个数量级的更复杂的工作 - 当然,只有在它使 API 用户受益时才有意义。如果“实时内容更新”调用可用,则复制源存储桶中的更改不仅是其职责,而且也是实施此类实施的唯一原因。

TL;DR:就去做吧——当 .list_blobs() 返回时,你有一个可以迭代的对象,并包含一个本地(与“远程”相反)存在于存储桶中的对象的静态集呼叫被处理的那一刻。

https://googleapis.dev/python/storage/latest/client.html

,

我认为这可以更简单。 list_blobs 返回 lazy iterator。为什么不在开始时实现 blob 列表,然后迭代它们呢?这样您就不会看到任何添加到存储桶中的新 blob。

from google.cloud import storage
client = storage.Client()
bucket = client.get_bucket("my_bucket")
blobs = list(bucket.list_blobs())
print(type(blobs))

>>> <class 'list'>

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。