如何解决如何有效地使用带有 python mpi4py 并行性的 Pandas?
我正在尝试对不同目录中的 Pandas 数据框进行一些操作。步骤是-
- 转到每个特定目录并使用 glob 搜索特定文件
- 使用 bash 脚本进行一些编辑并将其转换为更合理的数据文件
- 接下来,将数据转换为“CSV”,然后开始使用 Pandas
- 最后,使用多处理 (poo.map) 执行使用多个处理器的操作。
然而,问题在于数据帧的大小(650065 行),多处理仍然很慢。我想知道是否有人知道如何用 mpi4py 有效地替换它。
这是代码-
import time
import pandas as pd
import numpy as np
import multiprocessing
import subprocess
import os
import sys
import glob,os
import mpi4py
from mpi4py import MPI
import numpy as np
# get number of processors and processor rank
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
results = []
z_values = []
id_list = []
def processData(df):
# df = pd.read_csv(file)
# First find the rows where N present that is atom id 64.0 and 65.0
n1 = np.array(np.where(df.id == 64.0))
n2 = np.array(np.where(df.id == 65.0))
# Determine total length of the file i.e number of structures
lenn1 =n1.size
lenn2 =n2.size
# Create list of the N1 and N2 rows
n1id = [i.tolist() for i in n1]
n2id = [j.tolist() for j in n2]
# Create numpy array of x,y,z of each atom of each row
df1 = np.array(df[['x','y','z']].loc[n1id[0]])
df2 = np.array(df[['x','z']].loc[n2id[0]])
# Function to iterate over structures and compute the distance for each pair of rows
dist_list = []
for i in np.arange(0,lenn1-1):
dist = np.linalg.norm(df2[i]-df1[i])
dist_list.append([i,dist])
# largest
# value1=sorted(dist_list,key=lambda x: x[1],reverse=True)[0][1]
# value1_id=sorted(dist_list,reverse=True)[0][0]
# # Second largest
# value2=sorted(dist_list,reverse=True)[1][1]
# value2_id=sorted(dist_list,reverse=True)[1][0]
# return value1_id,value1,df1[value1_id],df2[value1_id],value2_id,value2,df1[value2_id],df2[value2_id]
for i in range(0,10000,100):
value=sorted(dist_list,reverse=True)[i][1]
value_id=sorted(dist_list,reverse=True)[i][0]
results.append(value)
id_list.append(value_id)
z_values.append(df2[value_id][2])
re_zip=zip(id_list,results,z_values)
zip_list = list(re_zip)
return zip_list
if __name__ == "__main__":
start_time = time.time()
file_list = []
for filename in glob.iglob('/home/ap86/lammps-files/100K/**/*.dump',recursive=True):
if os.path.isfile(filename):
print(filename[:-4])
dir=filename[:-10][:-1]
os.chdir(dir)
print(os.getcwd())
bashCommand = "scp -r ../traj.sh ."
process = subprocess.Popen(bashCommand.split(),stdout=subprocess.PIPE)
output,error = process.communicate()
bashCommand = "scp -r traj.dump traj.dump.orig"
process = subprocess.Popen(bashCommand.split(),error = process.communicate()
with open('traj.dump') as f:
if 'vx' in f.read():
bashCommand = "bash traj.sh"
process = subprocess.Popen(bashCommand.split(),stdout=subprocess.PIPE)
output,error = process.communicate()
df = pd.read_csv('traj.dump',sep='\\s+',names=["id","type","x","y","z","vx","vy","vz"],dtype=float)
df.to_csv('traj.csv',index=False)
else:
bashCommand = "bash traj.sh"
process = subprocess.Popen(bashCommand.split(),error = process.communicate()
df = pd.read_csv('trajf1.dump',"z"],index=False)
df = pd.read_csv('traj.csv')
sortedlist = processData(df)
print(sortedlist)
print("--- %s seconds ---" % (time.time() - start_time))
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。