如何解决如何在 Windows 上有效地实现具有超时的 Path.exists() 版本
我的问题的背景是我在 Windows 上使用“Path.exists()”来检查网络路径是否可用。如果不是,“Path.exists()”会在返回 False
之前阻塞很长时间(20 秒或更长时间)。
这太长了,所以我想定义一个时间限制,之后我认为资源不可用。
我想出了一个围绕“Path”的包装类,它使用 ThreadPoolExecutor
来调用 Path
的 exists
方法并在超时后取消调用:
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self,*args,timeout: float = 1,**kwargs):
self._path = Path(*args,**kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
start_time = time.time()
while (time.time() - start_time) < self.timeout:
if future.done():
return future.result()
future.cancel()
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self,name: str) -> Any:
return getattr(self._path,name)
def __str__(self) -> str:
return str(self._path)
这有效,但即使在本地文件夹上,即使超时设置为一秒,有时也会为本地驱动器上的现有文件夹返回 False
,我认为这是由于所需的开销启动线程。
所以我想我的问题是:是否可以通过使用另一个线程/多处理实现来减少线程池引入的开销?或者有没有我没有看到的没有线程的另一种可能的解决方案?
解决方法
多亏了@IInspectable,我想我知道我的错误是什么了。
在 while 循环中轮询 future.done
方法会产生太多不必要的循环。
Future.result()
实际上已经有一个超时参数并阻塞直到未来有结果或超时:
from concurrent.futures import ThreadPoolExecutor,TimeoutError
from pathlib import Path
from typing import Any
class TimeoutPath:
executor = ThreadPoolExecutor(max_workers=1)
def __init__(self,*args,timeout: float = 1,**kwargs):
self._path = Path(*args,**kwargs)
self.timeout = timeout
def exists(self) -> bool:
future = TimeoutPath.executor.submit(self._path.exists)
try:
return future.result(self.timeout)
except TimeoutError:
return False
def get_path(self) -> Path:
return self._path
def __getattr__(self,name: str) -> Any:
return getattr(self._path,name)
def __str__(self) -> str:
return str(self._path)
在我的机器上,这对于 2 毫秒的超时非常可靠。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。