如何解决在完美任务“批处理”中使用迭代器
我正在使用 prefect 并定义一个 flow 来插入带有 cosmos db 的文档。
问题在于 query_items() 调用是可迭代的,对于大型容器,无法将所有条目保存在内存中。
我相信我的问题可以简化为:
given an iterator,how can I create batches to be processed (mapped) in a prefect flow?
示例:
def big_iterable_function_i_cannot_change():
yield from range(1000000) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched"):
some_prefect_batching_magic.map(big_iterable_function_i_cannot_change())
prefect.FlowRunner | Flow run Failed: some reference tasks Failed.
解决方法
您收到此错误是因为您没有将 big_iterable_function_i_cannot_change
定义为 task
。 prefect
实际上并不直接执行 flow
。 flow
用于制作 schedule
,(用 dask
的说法)——然后用于执行流程,(据我所知)。 prefect
中的并行化仅在与 dask executor 一起使用时发生。
这是我对您的 flow
的看法。但是,如果您无法将 big_iterable_function_i_cannot_change
的任务装饰器添加到 task
中,请将其包装在任务中。最后 - 不确定您是否可以将生成器传递给映射任务。
import prefect
from prefect import Flow,Parameter,task
@task
def big_iterable_function_i_cannot_change():
return range(5) # some large amount of work
@task
def some_prefect_batching_magic(x):
# magic code here
pass
with Flow("needs-to-be-batched") as flow:
itter_res = big_iterable_function_i_cannot_change()
post_process_res = some_prefect_batching_magic.map(itter_res)
flow.visualize()
state = flow.run()
flow.visualize(flow_state=state)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。