微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

我的并行处理代码有问题吗?如何使用 multiprocessing.Process 和 multiprocessing.Queue 功能?

如何解决我的并行处理代码有问题吗?如何使用 multiprocessing.Process 和 multiprocessing.Queue 功能?

我为并行处理编码,但需要很长时间。我猜代码有问题。

我想做什么?

供您参考,我想要的结果是[1,2,3,1,3]。

我的代码如下:

import time
import numpy as np
import multiprocessing

data = [1,4,10,5,6,7,8,9,11,12,100,101]
dictionary = [1,3]

data_split = np.array_split(data,4)

Q = multiprocessing.Queue()

def recog_func(data):
    result = []
    for w in data:
        if w in [1,3]:
            result.append(w)
    print(result)
    Q.put(result)
    

procs=[]
for s in data_split:
    p = multiprocessing.Process(target = recog_func,args=(s,))
    p.start()
    result = Q.get()
    procs.extend(result)

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

end = time.time()

非常感谢您的帮助。

解决方法

这会正常工作。

import time
import numpy as np
import multiprocessing

data = [1,2,3,4,10,5,6,7,8,9,1,11,12,100,101]
dictionary = [1,3]

data_split = np.array_split(data,4)

Q = multiprocessing.Queue()


def recog_func(data):
    result = []
    for w in data:
        if w in [1,3]:
            result.append(w)
    print(result)
    Q.put(result)


procs = []
results = []
for s in data_split:
    p = multiprocessing.Process(target=recog_func,args=(s,))
    p.start()
    results.extend(Q.get())
    procs.append(p) 

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

end = time.time()
,

您有两个主要问题。第一个是您有列表 procs,您需要将使用以下语句创建的 Process 实例添加到其中,以便您稍后可以调用 join。也就是说,你失踪了:

    procs.append(p)

取而代之的是:

    procs.extend(result)

这是将结果存储在 procs 列表中。所以稍后当您尝试执行时:

for p in procs:
    p.join()

p 不再是一个 Process 实例而是一个 numpy.int64 实例,您现在会得到一个 AttributeError 异常,因为这种类型的对象没有 {{ 1}} 方法。

第二个问题是在下面的循环中:

join

您正在启动每个进程,然后立即等待该进程通过调用 for s in data_split: p = multiprocessing.Process(target = recog_func,)) p.start() result = Q.get() procs.extend(result) 返回其结果,然后再循环返回并启动下一个进程。因此,您仍然没有并行运行这些进程中的任何一个。即使您推迟调用 Q.get,通过调用 join,您实际上已经在等待第一个进程完成其所有处理并将其结果写入输出队列,然后再创建并启动下一个进程过程。出于所有意图和目的,第一个过程已完成处理。 在尝试阻止从任何进程检索结果之前,您必须创建并启动所有 3 个进程。但是现在所有三个进程都并行运行,您真的无法确定顺序他们完成并因此将他们的结果写入输出队列。因此,您需要有三个独立的输出队列,每个进程一个如果您希望结果按特定顺序

最后,您应该意识到创建进程的开销以及读取和写入这些多处理队列的开销,这在非多处理程序中是没有的。为了证明额外的开销是合理的,您的函数 Q.get 需要充分占用 CPU,我不相信。如果你做计时,我相信你会发现你并没有取得更好的成绩。

recog_func

打印:

import time
import numpy as np
import multiprocessing

data = [1,4)

def recog_func(data,q):
    result = []
    for w in data:
        if w in [1,3]:
            result.append(w)
    print(result)
    q.put(result)


queues = []
procs = []
results = []
for s in data_split:
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target = recog_func,q))
    procs.append(p)
    queues.append(q)
    p.start()
for q in queues:
    result = q.get()
    results.extend(result)

for p in procs:
    p.join()  # 프로세스가 모두 종료될 때까지 기다린다.

print(results)

end = time.time()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。