微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Pool,Processs或Queue无法进行多重处理

如何解决使用Pool,Processs或Queue无法进行多重处理

我正在使用一个脚本,该脚本从文本文件中读取化学化合物,该文件很大,并且我正尝试实现多处理,因此该文本文件被分块处理,但由于某种原因,它似乎无法正常工作,并且永远都无法进入树脚本中的搜索行。我尝试过multiprocessing.Pool,Queue和Process的方式不同,但是我不确定哪里出错了

这是原始脚本:


def _select_stocks(finder,stocks):
    stocks = list(stocks)
    try:
        module = importlib.import_module("custom_stock")
    except ModuleNotFoundError:
        pass
    else:
        if hasattr(module,"stock"):
            finder.stock.load_stock(module.stock,"custom_stock")
            stocks.append("custom_stock")
    finder.stock.select_stocks(stocks or finder.stock.available_stocks())

def _process_multi_smiles(filename,finder,output_name):
    output_name = output_name or "output.hdf5"
    with open(filename,"r") as fileobj:
        smiles = [line.strip() for line in fileobj.readlines()]

    results = defaultdict(list)
    for smi in smiles:
        finder.target_smiles = smi
        finder.prepare_tree()
        search_time = finder.tree_search()
        finder.build_routes()
        stats = finder.extract_statistics()

        logger().info(f"Done with {smi} in {search_time:.3} s")
        for key,value in stats.items():
            results[key].append(value)
        results["top_scores"].append(
            ",".join("%.4f" % score for score in finder.routes.scores)
        )
        results["trees"].append(finder.routes.dicts)

    data = pd.DataFrame.from_dict(results)
    with warnings.catch_warnings():  # This wil supress a PerformanceWarning
        warnings.simplefilter("ignore")
        data.to_hdf(output_name,key="table",mode="w")
    logger().info(f"Output saved to {output_name}")

def main():
    smiles = "/Desktop/library_enumeration_repo/aizynthfinder/TRY_THESE.csv"
    config_file = "/Desktop/library_enumeration_repo/aizynthfinder/config.yml"
    policy = 'my_policy'
    stocks = []
    output = "output_RSP_wlib_mptest_300.hfdf5"
    log_to_file =True

    multi_smiles = os.path.exists(smiles)

    file_level_logging = logging.DEBUG if log_to_file else None
    setup_logger(logging.INFO,file_level_logging)

    finder = AiZynthFinder(configfile=config_file)
    _select_stocks(finder,stocks)
    finder.policy.select_policy(policy or finder.policy.available_policies()[0])

    _process_multi_smiles(smiles,output)

这是我为实现多处理所做的更改,我尝试了多处理。也有池,但是每个似乎都卡在finder.tree_search()

smiles ="/Desktop/library_enumeration_repo/aizynthfinder/smilestext.txt"
config_file = "/Desktop/library_enumeration_repo/aizynthfinder/config.yml"
policy = 'my_policy'
stocks = []
output = "output_RSP_wlib_mptestPP.hfdf5"
log_to_file =False


def _select_stocks(finder,"custom_stock")
            stocks.append("custom_stock")

def _process_multi_smiles(queue,out_queue):
    output_name = "output_RSP_wlib_mptestPP.hfdf5"
    print (os.getpid(),"working")
    while True:
            item = queue.get(True)
            print (os.getpid(),"got",item)
            results = defaultdict(list)
            for smi in item:
                print(smi)
                finder.target_smiles = smi
                try:
                    finder.prepare_tree()
                    search_time = finder.tree_search()
                    print('done')
                    finder.build_routes()
                    stats = finder.extract_statistics()

                    logger().info(f"Done with {smi} in {search_time:.3} s")
                    for key,value in stats.items():
                        results[key].append(value)
                    results["top_scores"].append(
                        ",".join("%.4f" % score for score in finder.routes.scores)
                    )
                    results["trees"].append(finder.routes.dicts)
                except:
                    'ERROR'
                

            data = pd.DataFrame.from_dict(results)
            out_queue.put(results)
            
            with warnings.catch_warnings():  # This wil supress a PerformanceWarning
                warnings.simplefilter("ignore")
                data.to_hdf(output_name,mode="a")
            logger().info(f"Output saved to {output_name}")
            queue.task_done()


if __name__ == '__main__':
    smiles ="/Desktop/library_enumeration_repo/aizynthfinder/smilestext.txt"
    config_file = "/Desktop/library_enumeration_repo/aizynthfinder/config.yml"
    policy = 'my_policy'
    stocks = []
    output = "output_RSP_wlib_mptest7.hfdf5"
    log_to_file = True

    multi_smiles = os.path.exists(smiles)

    file_level_logging = logging.DEBUG if log_to_file else None
    setup_logger(logging.INFO,stocks)
    finder.policy.select_policy(policy or finder.policy.available_policies()[0])

    n = 2
   
    with open(smiles) as f:
        smiles = [line.strip() for line in f.readlines()]
    groups = [smiles[i * n:(i + 1) * n] for i in range((len(smiles) + n - 1) // n)]
    
    num_procs = 1
    out = multiprocessing.Queue()
    q = multiprocessing.Queue()
    
    procs = []
    
    for i in range(num_procs):
        nameStr = 'Worker_'+str(i)
        p = multiprocessing.Process(target=_process_multi_smiles,args=(q,out))
        p.start()
        procs.append(p)
        
    for item in groups:
        q.put(item)
        
    for i in procs:
        i.join()
    
    results = [output.get() for p in processes]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。