微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用“as_completed”时,如何正确清除“dask.distributed”中的未完成期货?

如何解决使用“as_completed”时,如何正确清除“dask.distributed”中的未完成期货?

我有一个很大的作业队列,我想提前中止。 后来的作业依赖于较早的作业,因此我无法一次将所有作业排队。考虑以下 MWE:

from dask.distributed import Client,as_completed
import numpy as np


def work(_):
    return np.random.random(size=(100_000,50))


def main(func):
    with Client() as client:
        futures = client.map(func,range(10),pure=False)    # pre-determined work
        ac = as_completed(futures,with_results=True)
        for future,result in ac:
            new_future = client.submit(func,pure=False)  # work depends on earlier output
            ac.add(new_future)
            break # Some condition is met,remaining jobs are irrelevant & can be aborted/discarded
        ac.clear()


if __name__ == '__main__':
    main(work)

上面的例子一般会产生如下错误

distributed.scheduler - ERROR - Workers don't have promised key: ['tcp://127.0.0.1:58617'],work-24a67a6d-4479-4f62-9865-bd48442198c4
nonetype: None
distributed.client - WARNING - Couldn't gather 1 keys,rescheduling {'work-24a67a6d-4479-4f62-9865-bd48442198c4': ('tcp://127.0.0.1:58617',)}

我希望 as_completed.clear 能够干净利落地处理剩余的期货。 我还做了一个变化来跟踪所有期货,并在调用 as_completed.clear 之前取消它们,但它产生了类似的结果。

是否有适当的方法来实现这种预期行为?

注意事项:

  • 如果 work 返回 None,问题似乎仍然发生,只是频率降低了。
  • 我在我的 Windows 机器上测试了这个,并从一个 Ubuntu docker 容器中测试。
  • 我使用了 Python 3.8.3 和 dask/distributed 2020.12.0

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。