微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

mpi4py 和 ctypes - 每当单个线程退出时,带有 MPI 的 Python 程序就会退出通过 C++

如何解决mpi4py 和 ctypes - 每当单个线程退出时,带有 MPI 的 Python 程序就会退出通过 C++

(这里使用的代码和我今天贴的另一个问题类似,只是两个问题不同)

我正在使用带有 python3 接口的复杂 C++ 代码(使用 ctypes)。为简单起见,我在下面复制了一个简单的 C++ 代码来构建我的问题。函数 exitOrNot() 接受一个参数 index 并在它等于 7 时退出

multi.cpp

#include <iostream> 
#include <cstdlib>

extern "C" int exitOrNot(int index) {
    int ret;
    if(index == 7) {
        std::cout << "EXITING... \n";
        std::exit(EXIT_FAILURE);
    }
    else {
        sleep(1)
        ret = index;
    }
    return ret;
}

我使用以下命令为上述程序 libmulti.so 创建了一个共享库 - g++ -fPIC -shared -o libmulti.so multi.cpp

在我的 Python 模块中,我使用 mpi4py 进行并行处理。在这个简单的例子中,我有两个函数 - ctypes_exitOrNot() 用于接收之前的 C++ 函数,multi() 用于迭代多个输入参数(在本例中,5*100=500 个参数)。

multi.py

import numpy as np
from mpi4py import MPI
import ctypes

_multi = ctypes.CDLL('./libmulti.so')

def ctypes_exitOrNot(index):
    ret = _multi.exitOrNot(ctypes.c_int(index))
    return ret

def multi(N):
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    N_threads = comm.Get_size()-1 # Thread 0 is used only for IO
    
    N_jobs = N
    N_jobs_per_thread = int(np.ceil( float(N_jobs)/float(N_threads) ))
        
    print("\nrank",rank,"N_threads",N_threads,"N_jobs_per_thread",N_jobs_per_thread,"\n")

    indices = range(rank-1,N_jobs,N_threads)
    
    if rank == 0: # IO thread             
        completed_ranks = []
        while True: # Wait to receive data until all threads are done
            status = MPI.Status()
            data = comm.recv(source=MPI.ANY_SOURCE,tag=MPI.ANY_TAG,status=status)
            received_rank = status.Get_source()
            
            if data == "done":
                completed_ranks.append(received_rank)
            else:               
                index_system = data["index_system"]    
                with open("completed.txt","a") as file_completed:
                    file_completed.write(str(index_system) + '\n')
                
            if len(completed_ranks)==N_threads:
                print("\nrank 0 -- all threads done; completed_ranks",completed_ranks,"\n")
                break

    else:
        for index_rank,index_job in enumerate(indices):
            index_system = index_job
            
            print("rank","index_system",index_system,"started")

            # EXITING CONDITION FROM EXTENAL C++
            ret = ctypes_exitOrNot(index_system)
            data = {"index_system":ret}

            print("rank","ended")
            
            comm.send(data,dest=0,tag=index_system)
        
        # This thread is done
        data = "done"
        comm.send(data,tag=index_system)
    
if __name__ == '__main__':
    multi(5*100)

上面代码的重要一行是ret = ctypes_exitOrNot(index_system)。传递给 exitOrNot() 的整数参数是索引 0-499,当它们被分成不同的线程时,索引为 7 的线程退出。作为参考,我的终端命令是 - 'mpiexec -n 6 python3 multi.py'

我终于来回答我的问题了。当一个线程通过 C++ 退出时(就像在这种情况下),整个程序退出,即所有其他线程也退出。但是,当从 Python 内部完成相同的退出时,其他线程会继续运行。

有没有办法让其他线程即使其中一个通过 C++ 退出也能继续运行? (我无法编辑实际的 C++ 文件本身)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。