微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在完美任务“批处理”中使用迭代器

如何解决在完美任务“批处理”中使用迭代器

我正在使用 prefect 并定义一个 flow 来插入带有 cosmos db 的文档。

问题在于 query_items() 调用是可迭代的,对于大型容器,无法将所有条目保存在内存中。

我相信我的问题可以简化为:

  • given an iterator,how can I create batches to be processed (mapped) in a prefect flow?

示例:

def big_iterable_function_i_cannot_change():
    yield from range(1000000) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched"):
    some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())

上面的代码或类似的代码会给我一个错误

prefect.FlowRunner | Flow run Failed: some reference tasks Failed.

解决方法

您收到此错误是因为您没有将 big_iterable_function_i_cannot_change 定义为 taskprefect 实际上并不直接执行 flowflow 用于制作 schedule,(用 dask 的说法)——然后用于执行流程,(据我所知)。 prefect 中的并行化仅在与 dask executor 一起使用时发生。

这是我对您的 flow 的看法。但是,如果您无法将 big_iterable_function_i_cannot_change 的任务装饰器添加到 task 中,请将其包装在任务中。最后 - 不确定您是否可以将生成器传递给映射任务。

import prefect
from prefect import Flow,Parameter,task

@task
def big_iterable_function_i_cannot_change():
    return range(5) # some large amount of work

@task
def some_prefect_batching_magic(x):
    # magic code here
    pass


with Flow("needs-to-be-batched") as flow:
    itter_res = big_iterable_function_i_cannot_change()
    post_process_res = some_prefect_batching_magic.map(itter_res)

flow.visualize()
state = flow.run()


flow.visualize(flow_state=state)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。