如何解决使用Pool,Processs或Queue无法进行多重处理
我正在使用一个脚本,该脚本从文本文件中读取化学化合物,该文件很大,并且我正尝试实现多处理,因此该文本文件被分块处理,但由于某种原因,它似乎无法正常工作,并且永远都无法进入树脚本中的搜索行。我尝试过multiprocessing.Pool,Queue和Process的方式不同,但是我不确定哪里出错了
这是原始脚本:
def _select_stocks(finder,stocks):
stocks = list(stocks)
try:
module = importlib.import_module("custom_stock")
except ModuleNotFoundError:
pass
else:
if hasattr(module,"stock"):
finder.stock.load_stock(module.stock,"custom_stock")
stocks.append("custom_stock")
finder.stock.select_stocks(stocks or finder.stock.available_stocks())
def _process_multi_smiles(filename,finder,output_name):
output_name = output_name or "output.hdf5"
with open(filename,"r") as fileobj:
smiles = [line.strip() for line in fileobj.readlines()]
results = defaultdict(list)
for smi in smiles:
finder.target_smiles = smi
finder.prepare_tree()
search_time = finder.tree_search()
finder.build_routes()
stats = finder.extract_statistics()
logger().info(f"Done with {smi} in {search_time:.3} s")
for key,value in stats.items():
results[key].append(value)
results["top_scores"].append(
",".join("%.4f" % score for score in finder.routes.scores)
)
results["trees"].append(finder.routes.dicts)
data = pd.DataFrame.from_dict(results)
with warnings.catch_warnings(): # This wil supress a PerformanceWarning
warnings.simplefilter("ignore")
data.to_hdf(output_name,key="table",mode="w")
logger().info(f"Output saved to {output_name}")
def main():
smiles = "/Desktop/library_enumeration_repo/aizynthfinder/TRY_THESE.csv"
config_file = "/Desktop/library_enumeration_repo/aizynthfinder/config.yml"
policy = 'my_policy'
stocks = []
output = "output_RSP_wlib_mptest_300.hfdf5"
log_to_file =True
multi_smiles = os.path.exists(smiles)
file_level_logging = logging.DEBUG if log_to_file else None
setup_logger(logging.INFO,file_level_logging)
finder = AiZynthFinder(configfile=config_file)
_select_stocks(finder,stocks)
finder.policy.select_policy(policy or finder.policy.available_policies()[0])
_process_multi_smiles(smiles,output)
这是我为实现多处理所做的更改,我尝试了多处理。也有池,但是每个似乎都卡在finder.tree_search()
smiles ="/Desktop/library_enumeration_repo/aizynthfinder/smilestext.txt"
config_file = "/Desktop/library_enumeration_repo/aizynthfinder/config.yml"
policy = 'my_policy'
stocks = []
output = "output_RSP_wlib_mptestPP.hfdf5"
log_to_file =False
def _select_stocks(finder,"custom_stock")
stocks.append("custom_stock")
def _process_multi_smiles(queue,out_queue):
output_name = "output_RSP_wlib_mptestPP.hfdf5"
print (os.getpid(),"working")
while True:
item = queue.get(True)
print (os.getpid(),"got",item)
results = defaultdict(list)
for smi in item:
print(smi)
finder.target_smiles = smi
try:
finder.prepare_tree()
search_time = finder.tree_search()
print('done')
finder.build_routes()
stats = finder.extract_statistics()
logger().info(f"Done with {smi} in {search_time:.3} s")
for key,value in stats.items():
results[key].append(value)
results["top_scores"].append(
",".join("%.4f" % score for score in finder.routes.scores)
)
results["trees"].append(finder.routes.dicts)
except:
'ERROR'
data = pd.DataFrame.from_dict(results)
out_queue.put(results)
with warnings.catch_warnings(): # This wil supress a PerformanceWarning
warnings.simplefilter("ignore")
data.to_hdf(output_name,mode="a")
logger().info(f"Output saved to {output_name}")
queue.task_done()
if __name__ == '__main__':
smiles ="/Desktop/library_enumeration_repo/aizynthfinder/smilestext.txt"
config_file = "/Desktop/library_enumeration_repo/aizynthfinder/config.yml"
policy = 'my_policy'
stocks = []
output = "output_RSP_wlib_mptest7.hfdf5"
log_to_file = True
multi_smiles = os.path.exists(smiles)
file_level_logging = logging.DEBUG if log_to_file else None
setup_logger(logging.INFO,stocks)
finder.policy.select_policy(policy or finder.policy.available_policies()[0])
n = 2
with open(smiles) as f:
smiles = [line.strip() for line in f.readlines()]
groups = [smiles[i * n:(i + 1) * n] for i in range((len(smiles) + n - 1) // n)]
num_procs = 1
out = multiprocessing.Queue()
q = multiprocessing.Queue()
procs = []
for i in range(num_procs):
nameStr = 'Worker_'+str(i)
p = multiprocessing.Process(target=_process_multi_smiles,args=(q,out))
p.start()
procs.append(p)
for item in groups:
q.put(item)
for i in procs:
i.join()
results = [output.get() for p in processes]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。