微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python pathos.multiprocessing 处理池为什么池交换信息?

如何解决Python pathos.multiprocessing 处理池为什么池交换信息?

简介

我有大量数据需要对其进行特定计算。不幸的是,如果我一个一个地运行计算,整个过程将需要一天时间。这就是我在具有 32 个 cpu 的机器上使用 pathos.multiprocessing ProcessingPool 的原因。这样整个过程大约需要 30 分钟。

问题

预期的行为是所有计算并行运行并且彼此完全独立。我注意到的是,这对于少量计算(例如 40)来说是正确的,但是如果我将数字增加到 90,数据就会变得混杂。进程似乎彼此“通信”……换句话说,它们以某种方式可以访问相同的变量。

问题

知道发生了什么吗?

有用链接https://pathos.readthedocs.io/en/latest/pathos.html

代码

下面的代码是我机器上的简化版本,但想法是一样的。所以,这是代码

from pathos.multiprocessing import ProcessingPool,cpu_count

class TestMultiprocessing:
    def __init__(self):
        self.number = 0

    def do_something(self,args):
        for arg in args:
            self.number += arg
        return self.number

    def run(self):
        # Generate a big list
        l = []
        for i in range(0,100):
            l.append([1,2,3])

        pool = ProcessingPool(cpu_count())
        results = pool.imap(self.do_something,l)
        pool.close()
        pool.join()
        pool.clear()
        results = list(results)

        for result in results:
            print(result)

tm = TestMultiprocessing()
tm.run()

更新

我无法展示示例代码,因为它是机密的,但我可以简要说明代码的工作原理。

一个包含几千行数据(~80 000 行)的 CSV 文件。我编写的代码必须运行 800 个任务,每个任务执行从 CSV 文件获取数据的特定计算。

机器有 N 个 cpu,因此 Pathos 将所有 800 个任务分解为 M 个组,每个组包含 N 个任务(如果 cpu数量是偶数 M * N = 800)。 Pathos 创建了 N 个并行执行所需计算的池。计算完成后,每个池都会生成一个字典列表,并继续执行下一组任务 - 重复该过程 M 次。

列表中的每个字典都包含一个与任务编号相对应的唯一键,可以是 1 到 800。这样我就可以检查池 X 是否生成与任务 X 相关的字典(例如 103)。期望是池 X 生成的结果只包含键 X。

需要说明的是,上述过程消耗了机器的大部分内存!

在详细观察生成的结果后,我注意到以下几点。如果我运行 N 个任务(例如,4 个任务对应于具有 4 个 cpu 的机器),每个字典列表都包含与特定任务对应的键。例如,池 X 生成一个带有键 X 的字典列表。

但是如果我运行超过 N 个任务,则对应于大于 N 的任务的字典包含来自先前任务的数据。例如,池 Y 生成一个包含键 Y、X 等的字典列表。

我的结论是池X没有清除之前计算的数据(内存),下一个任务继承了上一个任务的数据。

解决方

即使我使用 pool.clear() 也没有帮助我解决问题。我找到的临时解决方案如下。

我“手动”将任务分解为组。

import numpy as np
import math

# Get the number of cpus
cpus = cpu_count()

# Check how many groups of tasks we have to run
chunks = math.ceil(len(l)/cpus)

# Break down the list of tasks to chunks/groups
l_chunks = np.array_split(l,chunks)

# Loop the chunks
for l_chunk in l_chunks:
    pool = ProcessingPool(cpus)
    results = pool.imap(self.do_something,l_chunk.tolist())
    pool.close()
    pool.join()
    pool.clear()
    results = list(results)
    
    for result in results:
        print(result)

这为我解决了问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。