如何解决带有 pypeln 地图阶段结果的有序迭代器
我正在使用 pypeln 在 Python 中构建流水线阶段。
我面临的问题是 pypeln 在从 stage 中提取数据时不保留插入顺序。
为了解决此问题,我为放入管道的每个元素关联了一个 task_id
,以便将管道结果与 task_id 关联回。
import pypeln as pl
from uuid import uuid4
from itertools import tee
def gen_data():
for i in xxx:
task_id = str(uuid4())
data = xxx
# ('1ddf3c4c-1ca3-4e46-af67-2b2fd9ef9df8',data),....
yield task_id,data
def func(x):
task_id,data = x
# do something
return task_id,result
# duplicate the generator,to iterate in the same insertion order on the results
first,second = tee(gen_data())
result = pl.thread.map(func,first)
# iterate on duplicated iterator to get the same task_id order
for task_id,_ in second:
for task_id2,task_res in result:
# how to consume the iterator while matching the task_id,and yield the task_res ?
我基本上需要一些 itertools
函数来
知道如何以最 Pythonic 的方式做到这一点吗?
编辑:这是我的幼稚实现:
def ordered_output(input: Iterable[str],output: Iterable[Tuple[str,Any]]):
cached_results = []
it_output = iter(output)
for expected_task_id in input:
# try in cache
found = None
for value in cached_results:
task_id,res = value
if expected_task_id == task_id:
found = res
cached_results.remove(value)
break
if found is not None:
yield found
continue
# try to find in output
while True:
value = next(it_output)
task_id,res = value
if expected_task_id == task_id:
found = res
break
else:
# put in cache
cached_results.append(value)
if found is None:
# Error,not found ??
raise stopiteration
else:
yield found
用法:
only_tasks = (task for task,_ in second)
for res in ordered_output(only_tasks,result):
print(res)
谢谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。