微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用带有 pathos multiprocessing 的共享列表会引发“摘要发送被拒绝”错误

如何解决使用带有 pathos multiprocessing 的共享列表会引发“摘要发送被拒绝”错误

我正在尝试按照以下代码片段使用多处理来生成复杂的、不可选择的对象:

void removeAll(vector<int>& v,const int& x)
{
    int count = 0;

    for (vector<int>::iterator it = v.begin(); it < v.end(); it++)
    {
        if (*it == x && count > 0)
        {
            v.erase(it);
        }
        
        if (*it == x)
        {
            count++;
        }
    }
}

(努力用更简单的代码重现这个,所以我展示了实际的代码

我需要更新一个可共享的变量,所以我使用 from multiprocessing import Manager from pathos.multiprocessing import ProcessingPool class Facility: def __init__(self): self.blocks = Manager().list() def __process_blocks(self,block): designer = block["designer"] apply_terrain = block["terrain"] block_type = self.__block_type_to_string(block["type"]) block = designer.generate_block(block_id=block["id"],block_type=block_type,anchor=Point(float(block["anchor_x"]),float(block["anchor_y"]),float(block["anchor_z"])),pcu_anchor=Point(float(block["pcu_x"]),float(block["pcu_y"]),0),corridor_width=block["corridor"],jb_height=block["jb_connect_height"],min_Boxes=block["min_Boxes"],apply_terrain=apply_terrain) self.blocks.append(block) def design(self,apply_terrain=False): designer = FacilityBuilder(string_locator=self._string_locator,string_router=self._string_router,Box_router=self._Box_router,sorter=self._sorter,tracker_configurator=self._tracker_configurator,config=self._config) blocks = [block.to_dict() for index,block in self._store.get_blocks().iterrows()] for block in blocks: block["designer"] = designer block["terrain"] = apply_terrain with ProcessingPool() as pool: pool.map(self.__process_blocks,blocks) 初始化一个类级变量,如下所示:

multiprocessing.Manager

这给我留下了以下错误(仅部分堆栈跟踪):

self.blocks = Manager().list()

作为最后的手段,我尝试使用 File "C:\Users\Paul.Nel\Documents\repos\autopV\.autopv\lib\site-packages\dill\_dill.py",line 481,in load obj = StockUnpickler.load(self) File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\managers.py",line 933,in RebuildProxy return func(token,serializer,incref=incref,**kwds) File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\managers.py",line 783,in __init__ self._incref() File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\managers.py",line 837,in _incref conn = self._Client(self._token.address,authkey=self._authkey) File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\connection.py",line 513,in Client answer_challenge(c,authkey) File "C:\Users\Paul.Nel\AppData\Local\Programs\Python\python39\lib\multiprocessing\connection.py",line 764,in answer_challe nge raise AuthenticationError('digest sent was rejected') multiprocessing.context.AuthenticationError: digest sent was rejected 的标准 python 实现来尝试规避 ThreadPool 问题,但这也不太顺利。我已经阅读了许多类似的问题,但还没有找到解决这个特定问题的方法。是 pickle 的问题还是 dillpathos 的接口方式的问题?

编辑:所以我设法用示例代码复制了它,如下所示:

mulitprocessing.Manager

添加 import os import math from multiprocessing import Manager from pathos.multiprocessing import ProcessingPool class MyComplex: def __init__(self,x): self._z = x * x def me(self): return math.sqrt(self._z) class Starter: def __init__(self): manager = Manager() self.my_list = manager.list() def _f(self,value): print(f"{value.me()} on {os.getpid()}") self.my_list.append(value.me) def start(self): names = [MyComplex(x) for x in range(100)] with ProcessingPool() as pool: pool.map(self._f,names) if __name__ == '__main__': starter = Starter() starter.start() 时发生错误

解决方法

所以我已经解决了这个问题。如果像 mmckerns 这样的人或在多处理方面比我更了解的其他人可以评论为什么这是一个解决方案,我仍然会很棒。

问题似乎是在 Manager().list() 中声明了 __init__。以下代码可以正常工作:

import os
import math
from multiprocessing import Manager
from pathos.multiprocessing import ProcessingPool


class MyComplex:

    def __init__(self,x):
        self._z = x * x

    def me(self):
        return math.sqrt(self._z)


class Starter:

    def _f(self,value):
        print(f"{value.me()} on {os.getpid()}")
        return value.me()

    def start(self):
        manager = Manager()
        my_list = manager.list()
        names = [MyComplex(x) for x in range(100)]

        with ProcessingPool() as pool:
            my_list.append(pool.map(self._f,names))
        print(my_list)


if __name__ == '__main__':
    starter = Starter()
    starter.start()

这里我声明了 list 本地的 ProcessingPool 操作。如果我愿意,我可以在之后将结果分配给一个类级别的变量。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。