微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

mpi4py:如果消息数量未知,如何确保所有发送的消息都收到了?

如何解决mpi4py:如果消息数量未知,如何确保所有发送的消息都收到了?

我正在尝试在核心(工作人员)之间发送消息,但是消息的数量及其目的地是未知的。 我正在从事的工作与矩阵完成/ SGD有关。为此,我试图熟悉用于Python的MPI数据包 mpi4py

我的目标
每个工作人员都有一个本地列表,其中包含用于执行某些任务的数据。在设定的时间内,每个工作人员将不断从列表中删除项目,进行一些计算,然后将项目发送给另一位随机工作人员。该列表将很快变空,因此工作人员还必须检查其他随机工作人员发送的传入数据并将其添加到列表中。时间到时,所有工作人员都必须检索剩余的已发送消息。

由于工作人员不知道已发送给它的消息数量,因此我不知道如何确保所有消息都得到接收。在下面的尝试中,我试图让工作人员发送结束消息以表明这是最后一条消息,但此消息未收到。这是使用recv / irecv的错误时间吗?
理想的解决方案是让工作人员自己将项目添加到另一个工作人员的本地队列中。有办法吗?

import random
from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

others = [i for i in range(size)]
others.remove(rank)
local_queue = [random.randint(0,100) for _ in range(5)]

timeout = time.time() + 2
while time.time() < timeout:
    req = comm.irecv()
    # Send data to random worker
    if len(local_queue) > 0:
        r = random.choice(others)
        comm.send(local_queue.pop(0),dest=r)
    # Try to retrieve sent messages
    status = req.test()
    if (status[0]):
        local_queue.append(status[1])
comm.Barrier()

# Send final message
for x in others:
    comm.send("Done",dest=x)
comm.Barrier()

# Retrieve remaining messages
for x in others:
    msg = comm.recv(source=x)
    while msg != "Done":
        local_queue.append(msg)
        msg = comm.recv(source=x)
    
print (rank,local_queue)

MPI.Finalize()

解决方法

首先,您的代码中存在问题-您不断发布非阻塞接收,而仅等待其中一些接收完成:

while time.time() < timeout:
    req = comm.irecv()
    # ...
    status = req.test()
    if (status[0]):
        local_queue.append(status[1])

这将在循环的每次迭代中启动一个新的非阻塞接收,而不管当前活动的接收是否已完成。我在读取mpi4py的Cython代码时遇到了麻烦,但是看起来垃圾回收时未取消活动请求,因此,这里的另一个问题可能是资源泄漏。至少在我的测试系统上,这会导致分段错误。更好的选择是仅在旧请求完成后才发布新请求:

req = comm.irecv()
while time.time() < timeout:
    # ...
    status = req.test()
    if (status[0]):
        local_queue.append(status[1])
        req = comm.irecv()

现在,对于实际问题,您的解决方案几乎是正确的。问题在于,在第一个循环中也可能收到"Done"消息。为了使其正常工作,您应该对两个循环中已收到的"Done"消息数进行计数,并继续使用第二个接收循环中第一个循环中已发布的非阻塞操作。此想法的有效实现如下:

import random
from mpi4py import MPI
import time

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

others = [i for i in range(size)]
others.remove(rank)
local_queue = [random.randint(0,100) for _ in range(5)]

done = 0

comm.Barrier()

timeout = time.time() + 2
req = comm.irecv()
while time.time() < timeout:
    # Send data to random worker
    if len(local_queue) > 0:
        r = random.choice(others)
        comm.send(local_queue.pop(0),dest=r)
    # Try to retrieve sent messages
    [status,msg] = req.test()
    if status:
        if msg != "Done":
            local_queue.append(msg)
        else:
            done += 1
        req = comm.irecv()

# Send final message
for x in others:
    comm.send("Done",dest=x)

if done == len(others):
    req.Cancel()
    req.Free()
else:
    # Retrieve remaining messages
    while True:
        msg = req.wait()
        if msg != "Done":
            local_queue.append(msg)
        else:
            done += 1
            if done == len(others):
                break
        req = comm.irecv()

print (rank,local_queue)

请注意,我在第一个循环之前放置了一个障碍,并移除了其他障碍。在MPI中,通常情况是一个级别的开始比其余级别早,因此在第一个并行操作使所有级别同步之前是一个障碍。后面的障碍没有好处。同样,在罕见但可能的情况下,秩在第一个循环中接收所有"Done"消息(在第一个循环之前常常没有障碍的发生),将有一个悬空的非阻塞接收请求,需要取消并释放。

对于理想的解决方案,MPI确实提供了所谓的单面内存操作,但是这些未在mpi4py中公开,我想是因为插入其他进程的内存与托管内存并不能很好地融合在一起。像Python这样的语言。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。