如何解决如何使用 asyncio 而不是线程在一个生成器上评估多个函数?
目标
这项工作旨在为以下问题提供有效的解决方案。
source = lambda: range(1 << 24) # for example
functions = (min,max,sum) # for example
data = tuple(source()) # from some generator
results = tuple(f(data) for f in functions)
这有效。 source()
函数可以生成任意多的值。它们被放入一个名为 tuple
的 data
中。然后使用该 functions
调用一系列 tuple
以给出 results
。这些函数迭代一个给定的参数化迭代器一次,然后给出它们的结果。这适用于小型数据集。但是,如果 source()
生成很多很多值,则必须将它们全部存储起来。这会占用内存。
可能的解决方案
有点像……
from typing import Callable,Iterable,Tuple,TypeVar
TI = TypeVar('TI')
TO = TypeVar('TO')
def magic_function(data: Iterable[TI],fxns: Iterable[Callable[[Iterable[TI]],TO]]) -> Tuple[TO,...]:
stored = tuple(data) # memory hog,prohibitively
return tuple(f(stored) for f in fxns)
source = lambda: range(1 << 24) # for example
functions = (min,sum) # for example
results = magic_function(source(),functions)
这就是我一直在努力做的。此 magic_function()
会将 data
迭代器提供给某种内部异步服务器。然后 fxns
将被赋予异步客户端——这似乎是普通的迭代器。 fxns
可以将这些客户端作为未修改的迭代器进行处理。 fxns
不能修改。可以使用 threading
模块执行此操作。不过,开销会很可怕。
格外清晰
这应该是真的。
source = lambda: range(1 << 24) # for example
functions = (min,sum) # for example
if first_method:
data = tuple(source()) # from some generator
results = tuple(f(data) for f in functions)
else:
results = magic_function(source(),functions)
无论first_method
是True
还是False
,对于source()
相同的输出和相同的functions
,results
应该always 匹配(对于单遍迭代器消耗 functions
)。第一个计算并存储整个数据集。这可能会非常浪费和缓慢。神奇的方法应该以最小的开销成本(时间和内存)来节省内存。
线程实现
这是一个使用 threading
模块的有效实现。它明显慢...
#!/usr/bin/python3
from collections import namedtuple
from random import randint
from statistics import geometric_mean,harmonic_mean,mean,median,median_high,median_low,mode
from threading import Event,Lock,Thread
from typing import *
''' https://pastebin.com/u4mTHfgc '''
int_iterable = Iterable[int]
_T = TypeVar('_T1',int,float)
_FXN_T = Callable[[int_iterable],_T]
class Server:
_it: int_iterable
slots: int
edit_slots: Lock
element: _T
available: Event
zero_slots: Event
end: bool
def __init__(self,it: int_iterable):
self._it = it
self.slots = 0
self.edit_slots = Lock()
self.available = Event()
self.zero_slots = Event()
self.end = False
def server(self,queue_length: int):
available = self.available
zero_slots = self.zero_slots
for v in self._it:
self.slots = queue_length
self.element = v
zero_slots.clear()
available.set()
zero_slots.wait()
self.slots = queue_length
self.end = True
zero_slots.clear()
available.set()
zero_slots.wait()
def client(self) -> int_iterable:
available = self.available
zero_slots = self.zero_slots
edit_slots = self.edit_slots
while True:
available.wait()
end = self.end
if not end:
yield self.element
with edit_slots:
self.slots -= 1
if self.slots == 0:
available.clear()
zero_slots.set()
zero_slots.wait()
if end:
break
class Slot:
thread: Thread
fxn: _FXN_T
server: Server
qid: int
result: Union[Optional[_T],Exception,Tuple[Exception,Exception]]
def __init__(self,fxn: _FXN_T,server: Server,qid: int):
self.thread = Thread(target = self.run,name = f'BG {id(self)} thread {qid}')
self.fxn = fxn
self.server = server
self.qid = qid
self.result = None
def run(self):
client = self.server.client()
try:
self.result = self.fxn(client)
except Exception as e:
self.result = e
try:
for _ in client: # one thread breaking won't break it all.
pass
except Exception as f:
self.result = e,f
class BranchedGenerator:
_server: Server
_queue: List[Slot]
def __init__(self,it: int_iterable):
self._server = Server(it)
self._queue = []
def new(self,fxn: _FXN_T) -> int:
qid = len(self._queue)
self._queue.append(Slot(fxn,self._server,qid))
return qid
def finalize(self):
queue = self._queue
for t in queue:
t.thread.start()
self._server.server(len(queue))
for t in queue:
t.thread.join()
def get(self,qid: int) -> _T:
return self._queue[qid].result
@classmethod
def make(cls,it: int_iterable,fxns: Iterable[_FXN_T]) -> Tuple[_T,...]:
tmp = cls(it)
qid_range = max(map(tmp.new,fxns))
tmp.finalize()
return tuple((tmp.get(qid)) for qid in range(qid_range + 1))
seq_stats = namedtuple('seq_stats',('tuple','mean','harmonic_mean','geometric_mean','median','median_high','median_low','mode'))
def bundle_bg(xs: int_iterable) -> seq_stats:
tmp = BranchedGenerator(xs)
# noinspection PyTypeChecker
ys = seq_stats(
tmp.new(tuple),tmp.new(mean),tmp.new(harmonic_mean),tmp.new(geometric_mean),tmp.new(median),tmp.new(median_high),tmp.new(median_low),tmp.new(mode)
)
tmp.finalize()
return seq_stats(
tmp.get(ys.tuple),tmp.get(ys.mean),tmp.get(ys.harmonic_mean),tmp.get(ys.geometric_mean),tmp.get(ys.median),tmp.get(ys.median_high),tmp.get(ys.median_low),tmp.get(ys.mode)
)
def bundle(xs: int_iterable) -> seq_stats:
return seq_stats(
tuple(xs),mean(xs),harmonic_mean(xs),geometric_mean(xs),median(xs),median_high(xs),median_low(xs),mode(xs)
)
def display(v: seq_stats):
print(f'Statistics of {v.tuple}:\n'
f'\tMean: {v.mean}\n'
f'\tHarmonic Mean: {v.harmonic_mean}\n'
f'\tGeometric Mean: {v.geometric_mean}\n'
f'\tMedian: {v.median}\n'
f'\tMedian High: {v.median_high}\n'
f'\tMedian Low: {v.median_low}\n'
f'\tMode: {v.mode};')
def new(length: int,inclusive_maximum: int) -> int_iterable:
return (randint(1,inclusive_maximum) for _ in range(length))
def test1() -> int:
sample = new(10,1 << 65)
struct1 = bundle_bg(sample)
display(struct1)
struct2 = bundle(struct1.tuple)
display(struct2)
matches = seq_stats(*(a == b for (a,b) in zip(struct1,struct2)))
display(matches)
return sum(((1 >> i) * (not e)) for (i,e) in enumerate(matches))
def test2():
sample = new(1000,1 << 5)
struct1 = seq_stats(*BranchedGenerator.make(
sample,(tuple,geometric_mean,mode)
))
display(struct1)
struct2 = bundle(struct1.tuple)
display(struct2)
matches = seq_stats(*(a == b for (a,e) in enumerate(matches))
def test3():
pass
if __name__ == '__main__':
exit((test2()))
Branching Generator Module (V3) [using threading] - Pastebin.com 链接具有更新的代码。从开始到输出,半秒过去了。这只是八个功能! test1()
和 test2()
都有这个速度问题。
尝试
我尝试使用 magic_function()
模块实现 asyncio
。
#!/usr/bin/python3
from asyncio import Task,create_task,run,wait
from collections import deque,namedtuple
from random import randint
from statistics import geometric_mean,mode
from typing import *
''' https://pastebin.com/ELzEaSK8 '''
int_iterable = Iterable[int]
_T = TypeVar('_T1',float)
ENGINE_T = AsyncGenerator[Tuple[_T,bool],int]
async def injector(engine: ENGINE_T,qid: int) -> AsyncIterator[int]:
while True:
try:
x,try_again = await engine.asend(qid)
except StopAsyncIteration:
break
if try_again:
continue
yield x
WRAPPER_FXN_T = Callable[[int_iterable],_T]
def wrapper(fxn: WRAPPER_FXN_T,engine: ENGINE_T,qid: int):
async def i():
# TypeError: 'async_generator' object is not iterable
return fxn(iter(x async for x in injector(engine,qid)))
return i
class BranchedGenerator:
_it: int_iterable
_engine: ENGINE_T
_queue: Union[tuple,deque]
def __init__(self,it: int_iterable):
self._it = it
self._engine = self._make_engine()
# noinspection PyTypeChecker
wait(self._engine)
self._queue = deque()
async def _make_engine(self) -> ENGINE_T: # it's like a server
lq = len(self._queue)
result = try_again = 0,True
for value in self._it:
waiting = set(range(lq))
while True:
qid = (yield result)
if len(waiting) == 0:
result = try_again
break
if qid in waiting:
waiting.remove(qid)
result = value,False
else:
result = try_again
def new(self,fxn: WRAPPER_FXN_T) -> int:
qid = len(self._queue)
self._queue.append(wrapper(fxn,self._engine,qid)())
return qid
def finalize(self):
self._queue = tuple(self._queue)
def get(self,qid: int) -> Task:
return create_task(self._queue[qid])
@classmethod
@(lambda f: (lambda it,fxns: run(f(it,fxns))))
def make(cls,fxns: Iterable[Callable[[int_iterable],_T]]) -> Tuple[_T,fxns))
tmp.finalize()
return tuple((await tmp.get(qid)) for qid in range(qid_range + 1))
seq_stats = namedtuple('seq_stats','mode'))
@(lambda f: (lambda xs: run(f(xs))))
async def bundle_bg(xs: int_iterable) -> seq_stats:
tmp = BranchedGenerator(xs)
# noinspection PyTypeChecker
ys = seq_stats(
tmp.new(tuple),tmp.new(mode)
)
tmp.finalize()
return seq_stats(
await tmp.get(ys.tuple),await tmp.get(ys.mean),await tmp.get(ys.harmonic_mean),await tmp.get(ys.geometric_mean),await tmp.get(ys.median),await tmp.get(ys.median_high),await tmp.get(ys.median_low),await tmp.get(ys.mode)
)
def bundle(xs: int_iterable) -> seq_stats:
return seq_stats(
tuple(xs),e) in enumerate(matches))
async def test2():
sample = new(1000,1 << 5)
# noinspection PyTypeChecker
struct1 = seq_stats(*await BranchedGenerator.make(
sample,e) in enumerate(matches))
async def test3():
pass
if __name__ == '__main__':
exit((test1()))
Branching Generator Module (V2) - Pastebin.com 链接具有最新版本。我不会更新嵌入的代码!如果进行了更改,pastebin 副本将包含它们。
测试
-
test1()
确保bundle_bg()
做bundle()
做的事情。他们应该做同样的事情。 -
test2()
查看BranchedGenarator.make()
的行为是否像bundle_bg()
和(传递地)像bundle()
。BranchedGenarator.make()
应该最像magic_function()
。 -
test3()
还没有任何用途。
状态
第一个测试失败。第二个测试在调用 BranchedGenerator.make()
时也有类似的错误。
[redacted]/b_gen.py:45: RuntimeWarning: coroutine 'wait' was never awaited
wait(self._engine)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
File "[redacted]/b_gen.py",line 173,in <module>
exit((test1()))
File "[redacted]/b_gen.py",line 144,in test1
struct1 = bundle_bg(sample)
File "[redacted]/b_gen.py",line 87,in <lambda>
@(lambda f: (lambda xs: run(f(xs))))
File "/usr/lib64/python3.9/asyncio/runners.py",line 44,in run
return loop.run_until_complete(main)
File "/usr/lib64/python3.9/asyncio/base_events.py",line 642,in run_until_complete
return future.result()
File "[redacted]/b_gen.py",line 103,in bundle_bg
await tmp.get(ys.tuple),File "[redacted]/b_gen.py",line 31,in i
return fxn(iter(x async for x in injector(engine,qid)))
TypeError: 'async_generator' object is not iterable
sys:1: RuntimeWarning: coroutine 'wrapper.<locals>.i' was never awaited
老实说,我是 asyncio
的新手。我不知道如何解决这个问题。
问题
有人可以帮我解决这个问题吗?!请?带有 asyncio
的这个应该和带有 threading
的那个完全一样——只是没有开销。
另一种途径
在此之前,我尝试了一个更简单的实现。
#!/usr/bin/python3
from random import randrange
from statistics import mean as st_mean,median as st_median,mode as st_mode
from typing import Any,Callable,TypeVar
''' https://pastebin.com/xhfT1njJ '''
class BranchedGenerator:
_n: Iterable[int]
_stop_value: Any
def __init__(self,n: Iterable[int],stop: Any):
self._n = n
self._stop_value = stop
@property
def new(self):
return
def wrapper1(f):
new = (yield)
# SyntaxError: 'yield' inside generator expression
yield f((y for _ in new if (y := (yield)) or True))
return
_T1 = TypeVar('_T1')
_T2 = TypeVar('_T2')
def wrapper2(ns: Iterable[_T1],fs: Iterable[Callable[[Iterable[_T1]],_T2]]) -> Tuple[_T2,...]:
def has_new():
while new:
yield True
while True:
yield False
new = True
xwf = tuple(map(wrapper1,fs))
for x in xwf:
next(x)
x.send(has_new)
next(x)
for n in ns:
for x in xwf:
x.send(n)
new = False
return tuple(map(next,xwf))
def source(n: int) -> Iterable[int]:
return (randrange(-9,9000) for _ in range(n))
normal = (tuple,st_mean,st_median,st_mode)
def test0():
sample = tuple(source(25))
s_tuple,s_mean,s_median,s_mode = wrapper2(sample,normal)
b_tuple,b_mean,b_median,b_mode = (f(s_tuple) for f in normal)
assert all((
s_tuple == b_tuple,s_mean == b_mean,s_median == b_median,s_mode == b_mode
))
def test1():
sample = source(25)
s_tuple,b_mode = (f(s_tuple) for f in normal)
print(
'Test1:'
'\nTuple',s_tuple,'\n',b_tuple,'\n==?',v0 := s_tuple == b_tuple,'\nMean',v1 := s_mean == b_mean,'\nMedian',v2 := s_median == b_median,'\nMode',s_mode,b_mode,v3 := s_mode == b_mode,'\nPasses',''.join('01'[v * 1] for v in (v0,v1,v2,v3)),'All?',all((v0,v3))
)
if __name__ == '__main__':
test0()
test1()
Branching Generator Module (V1) - Pastebin.com 链接包含更新政策。
测试
-
测试 0 说明
wrapper2()
是否做了应该做的事情。即调用所有functions
并返回结果。不保存内存,例如first_method == True
。 -
测试 1 就像
first_method == False
。sample
不是tuple
。
问题
哎哟!我可以编码,我向你保证。
File "[redacted]/branched_generator.py",line 25
yield f((y for _ in new if (y := (yield)) or True))
^
SyntaxError: 'yield' inside generator expression
我坦率地承认:这个版本过时了。 wrapper2()
显然最像 magic_function()
。
问题
由于这是更简单的实现,因此可以挽救此 wrapper2()
吗?如果没有,请不要担心。
解决方法
如果只是你担心的数据的具体化,你可以这样做
from itertools import tee
from statistics import geometric_mean,harmonic_mean,mean,median,median_high,median_low,mode
from random import randint
def magic_function(data,fxns):
return tuple(f(d) for f,d in zip(fxns,tee(data,len(fxns))))
def new(length: int,inclusive_maximum: int) -> Iterable[int]:
return (randint(1,inclusive_maximum) for _ in range(length))
sample = new(1000,1 << 5)
functions = (tuple,geometric_mean,mode)
magic_function(sample,functions)
NB tee
虽然不是线程安全的
PS:你说得对,这会消耗生成器并生成其中所有数据的 n 个副本。
我认为我们无法挽救您的问题中的 async 和 await 版本。 fxns 中的任意函数必须异步使用迭代器;他们必须(大致)在他们弹出和处理的每个项目之后释放控制流。但是 async 和 await 是合作的,我们不能强制任何给定的函数 f 在它的循环中 await (这就是为什么我们得到 TypeError
)。但是您使用 threading
确实 的解决方案有效,因为在它们的循环中的某些点,线程会被 VM 先发制人地休眠,这样就为其他函数提供了机会运行。
请记住,有一个 difference between simultaneous and concurrent。当我说函数的顺序循环就足够了时,我的意思是这样的,让其中一个消耗一个项目,然后让下一个消耗一个。这些功能不需要同时运行。事实上,您的工作线程示例不会同时运行任何内容(在 CPython VM 上。IronPython 和 Jython 可能同时运行多个 threading.Thread
,但在 CPython 上一次只有 1 个运行)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。