如何解决使用“as_completed”时,如何正确清除“dask.distributed”中的未完成期货?
我有一个很大的作业队列,我想提前中止。 后来的作业依赖于较早的作业,因此我无法一次将所有作业排队。考虑以下 MWE:
from dask.distributed import Client,as_completed
import numpy as np
def work(_):
return np.random.random(size=(100_000,50))
def main(func):
with Client() as client:
futures = client.map(func,range(10),pure=False) # pre-determined work
ac = as_completed(futures,with_results=True)
for future,result in ac:
new_future = client.submit(func,pure=False) # work depends on earlier output
ac.add(new_future)
break # Some condition is met,remaining jobs are irrelevant & can be aborted/discarded
ac.clear()
if __name__ == '__main__':
main(work)
上面的例子一般会产生如下错误:
distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:58617'],work-24a67a6d-4479-4f62-9865-bd48442198c4
nonetype: None
distributed.client - WARNING - Couldn't gather 1 keys,rescheduling {'work-24a67a6d-4479-4f62-9865-bd48442198c4': ('tcp://127.0.0.1:58617',)}
我希望 as_completed.clear
能够干净利落地处理剩余的期货。
我还做了一个变化来跟踪所有期货,并在调用 as_completed.clear
之前取消它们,但它产生了类似的结果。
是否有适当的方法来实现这种预期行为?
注意事项:
- 如果
work
返回None
,问题似乎仍然发生,只是频率降低了。 - 我在我的 Windows 机器上测试了这个,并从一个 Ubuntu docker 容器中测试。
- 我使用了 Python 3.8.3 和 dask/distributed 2020.12.0
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。