如何解决如何在流程之间正确共享Manager的用法
我想做的是在Process
的子类之间共享一个字典,当一个进程更新字典时,另一进程被告知要使用它。这在下面的代码中得到说明,其中MyProducer
开始填充字典,并且在每次迭代中触发一个事件,通知MyConsumer
处理字典。除了MyConsumer
中的字典为空的部分之外,其他所有东西都起作用...
from multiprocessing import Process,Manager,Event
class MyProducer(Process):
increment = 0
def __init__(self,dictionary,event):
Process.__init__(self)
self.dictionary = dictionary
self.event = event
def run(self):
while self.increment < 20:
self.dictionary[self.increment]=self.increment+10
self.increment = self.increment + 1
print("From producer: ",self.dictionary)
self.event.set()
while self.event.is_set() is True:
increment = self.increment
increment = increment + 1
class MyConsumer(Process):
def __init__(self,event):
Process.__init__(self)
self.dictionary = dictionary
self.event = event
def run(self):
while True:
self.event.wait()
print("From consumer: ",self.dictionary)
self.event.clear()
if __name__ == "__main__":
with Manager() as manager:
state_dict = manager.dict()
state_ready = Event()
producerprocess = MyProducer(state_dict,state_ready)
consumerprocess = MyConsumer(state_dict,state_ready)
producerprocess.start()
consumerprocess.start()
输出为
Process MyProducer-2:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/managers.py",line 827,in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception,another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/process.py",line 315,in _bootstrap
self.run()
File "main.py",line 13,in run
self.dictionary[self.increment]=self.increment+10
File "<string>",line 2,in __setitem__
File "/usr/lib/python3.8/multiprocessing/managers.py",line 831,in _callmethod
self._connect()
File "/usr/lib/python3.8/multiprocessing/managers.py",line 818,in _connect
conn = self._Client(self._token.address,authkey=self._authkey)
File "/usr/lib/python3.8/multiprocessing/connection.py",line 502,in Client
c = SocketClient(address)
File "/usr/lib/python3.8/multiprocessing/connection.py",line 630,in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
更新
我的目的是了解为什么该词典不适用于Process子类。我知道您可以在互联网上找到所有可行的案例。实际上,我有一个可以正常工作的解决方案,只需将dict用队列替换,我想了解为什么dict无法正常工作。
from multiprocessing import Process,Queue,queue,event):
Process.__init__(self)
self.queue = queue
self.event = event
def run(self):
while self.increment < 20:
self.queue.put([self.increment,self.increment+10])
self.increment = self.increment + 1
print("From producer: ",self.queue.qsize())
self.event.set()
while self.event.is_set() is True:
increment = self.increment
increment = increment + 1
class MyConsumer(Process):
def __init__(self,event):
Process.__init__(self)
self.queue = queue
self.event = event
def run(self):
while True:
self.event.wait()
print("From consumer: ",self.queue.qsize())
self.event.clear()
if __name__ == "__main__":
state_queue = Queue()
state_ready = Event()
producerprocess = MyProducer(state_queue,state_ready)
consumerprocess = MyConsumer(state_queue,state_ready)
producerprocess.start()
consumerprocess.start()
解决方法
仅供参考,通过这个简单的程序,我看到了几乎相同的死亡类型:
from multiprocessing import Process,Manager,Event
class MyProducer(Process):
def __init__(self,value,event):
Process.__init__(self)
self.val = value
self.event = event
def run(self):
print("at producer start",self.val.value)
self.val.value = 42
self.event.set()
class MyConsumer(Process):
def __init__(self,event):
Process.__init__(self)
self.val = value
self.event = event
def run(self):
self.event.wait()
print("From consumer: ",self.val.value)
if __name__ == "__main__":
with Manager() as manager:
state_value = manager.Value('i',666)
state_ready = Event()
producerprocess = MyProducer(state_value,state_ready)
consumerprocess = MyConsumer(state_value,state_ready)
producerprocess.start()
consumerprocess.start()
含义是,从Manager
获得的 no 类型的对象在作为属性附加到对象mp时必须进行重构,而mp必须在工作进程中通过魔术来构造。连接到Manager
服务器进程所需的信息似乎丢失了(无论是Linux-y系统上的套接字还是Windows上的命名管道)。
您可以提交错误报告,但是在此之前,除了重写代码以不使用管理器或将管理器对象显式传递给函数外,没有其他事情要做。
错误报告可以有两种解决方法:(1)使它“起作用”;或者,(2)在尝试创建此类对象的尝试时,更改了代码以引发异常。
另一种可能性(尚未尝试):如果仅在Linux上运行,则可以跳过__name__ == "__main__"
测试,并希望Manager
连接信息能够在fork()
中生存下来。
编辑
我在Python项目的跟踪器上打开了一个问题,
https://bugs.python.org/issue41660
WORKAROUND
在处理Python问题报告中的内容时,这里的“问题”似乎并不是如何设置问题,但是在您的代码中却忽略了彻底关闭工作人员的需要。只需在代码末尾添加此行(dict
版本-您关心的版本):
producerprocess.join()
足够了,因此,现在在我的电脑上(Win 10 Python 3.8.5),它会产生您期望的输出。但是,它会永远挂起,因为您的消费者.wait()
永远因为Event
而永无休止。
我的猜测(我确定80%是正确的):如果没有.join()
,则主进程继续运行解释器关闭代码(没什么可做的!),然后开始强制销毁multiprocessing
实现的东西仍然需要正确运行。
使用.join()
,主流程将阻塞,直到生产者完成为止-在此期间未启动任何关闭代码,并且.join()
明确指示生产者过程关闭其(详细说明) !)multiprocessing
干净地跳舞。
它可能仍使使用者流程处于损坏状态,但是,如果这样,我们将永远看不到任何证据,因为使用者被其self.event.wait()
永远封锁了。
在真实的程序中,您也应该采取一切措施彻底关闭使用者进程。
完整代码
这是一个完整的程序,显示了惯用的Python和并行编程的最佳实践:一切正常关闭,没有“忙循环”,没有比赛,没有死锁。 State
的实现比此特定问题所需的要复杂得多,但是它说明了一种值得学习的强大方法。
import multiprocessing as mp
P,C,F = 1,2,4 # bit flags for state values
# Unusual synchronization appears to be wanted here:
# After a producer makes a mutation,it must not make another
# before the consumer acts on it. So we'll say we're in state
# P when the producer is allowed to mutate,and in state C
# when there's a mutation for the consumer to process. Another
# state - F (for "finished") - tells the consumer it's time to
# quit. The producer stops on its own when it gets tired of
# mutating ;-)
class State:
def __init__(self):
# Initial state is empty - everyone is blocked.
# Note that we do our own locking around the shared
# memory,via the condition variable's mutex,so
# it would be pure waste for the Value to have
# its own lock too.
self.state = mp.Value('B',lock=False)
self.changed = mp.Condition()
# Wait for state to change to one of the states in the
# flag mask `what`. Return the bit flag of the state
# that succeeded.
def waitfor(self,what):
with self.changed:
while not (self.state.value & what):
self.changed.wait()
return self.state.value
# Force state to (bit flag) `what`,and notify waiters
# to wake up and see whether it's the state they're
# waiting for.
def setwhat(self,what):
with self.changed:
self.state.value = what
self.changed.notify_all()
class Base(mp.Process):
def __init__(self,dictionary,state):
super().__init__()
self.dictionary = dictionary
self.state = state
class MyProducer(Base):
def __init__(self,*args):
super().__init__(*args)
self.increment = 0
def run(self):
while self.increment < 20:
self.state.waitfor(P)
self.dictionary[self.increment] = self.increment + 10
self.state.setwhat(C)
# Whether the producer or the consumer prints the dict
# first isn't forced - and,indeed,they can both print at
# the same time,producing garbled output. Move the
# print() above the setwhat(C) to force the producer
# to print first,if desired.
print("From producer: ",self.dictionary)
self.increment += 1
class MyConsumer(Base):
def run(self):
while self.state.waitfor(C | F) != F:
print("From consumer: ",self.dictionary)
self.state.setwhat(P)
def main():
with mp.Manager() as manager:
state_dict = manager.dict()
state_state = State()
producerprocess = MyProducer(state_dict,state_state)
consumerprocess = MyConsumer(state_dict,state_state)
producerprocess.start()
consumerprocess.start()
# The producer is blocked waiting for state P,and the
# consumer is blocked waiting for state C (or F). The
# loop here counts down 5 seconds,so you can verify
# by eyeball that the waits aren't "busy" (they consume
# essentially no CPU cycles).
import time
for i in reversed(range(5)):
time.sleep(1)
print(i)
state_state.setwhat(P) # tell the producer to start!
producerprocess.join() # and wait for it to finish
# wait for the consumer to finish eating the last mutation
state_state.waitfor(P)
# tell the consumer we're all done
state_state.setwhat(F)
consumerprocess.join()
if __name__ == "__main__":
main()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。