微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python 多处理:每 k 次迭代写入文件

如何解决Python 多处理:每 k 次迭代写入文件

我使用 multiprocessing 中的 python 3.7 模块并行重复调用函数。我想每 k 次迭代将结果写到一个文件中。 (每次都可以是不同的文件。)

下面是我的第一次尝试,它基本上循环了函数参数集,并行运行每个集并将结果写入文件,然后再移动到下一个集。这显然是非常低效的。实际上,我的函数运行所需的时间要长得多,并且会因输入值而异,因此许多处理器在循环迭代之间处于空闲状态。

有没有更有效的方法来实现这一目标?

import multiprocessing as mp
import numpy as np
import pandas as pd

def myfunction(x): # toy example function
  return(x**2)

for start in np.arange(0,500,100):
    
    with mp.Pool(mp.cpu_count()) as pool:
        out = pool.map(myfunction,np.arange(start,start+100))
    
    pd.DataFrame(out).to_csv('filename_'+str(start//100+1)+'.csv',header=False,index=False) 

解决方法

我的第一个评论是,如果 myfunction 像您所展示的那样微不足道,那么使用多处理您的性能会更差,因为创建进程池会产生开销(顺便说一下,您不必要地创建在每次循环迭代中一遍又一遍)并将参数从一个进程传递到另一个进程。

假设 myfunction 是纯 CPU 并且在 map 返回 100 个值后,有机会重叠写入您没有利用的 CSV 文件(不清楚性能有多大)将通过并发磁盘写入得到改进;这取决于您拥有的驱动器类型、磁头移动等),那么多线程和多处理的组合可能是解决方案。假设 myfunction 为 100% CPU 且不释放全局解释器锁,因此处理池中的进程数将受限于 CPU 内核数,因此无法利用大于您拥有的 CPU 数量。无论如何,这是我的假设。例如,如果您打算使用某些 numpy 函数,那么这是一个错误的假设。另一方面,众所周知,numpy 对其自身的某些处理使用多处理,在这种情况下,使用 numpy 和您自己的多处理的组合可能会导致更差的性能。您当前的代码仅使用 numpy 来生成范围。这似乎有点矫枉过正,因为还有其他生成范围的方法。我通过定义 STARTSTOP 值以及 N_SPLITS(此范围的相等(或尽可能相等)的除法数)以稍微不同的方式自由地生成范围并生成可以转换为范围的起始值和终止值的元组。我希望这不会太令人困惑。但这似乎是一种更灵活的方法。

在下面的代码中,创建了一个线程池和一个处理池。任务被提交到线程池,其中一个参数是处理池,worker 使用它来进行 CPU 密集型计算,然后在组装结果后,worker 写出 CSV 文件。

from multiprocessing.pool import Pool,ThreadPool
from multiprocessing import cpu_count
import pandas as pd

def worker(process_pool,index,split_range):
    out = process_pool.map(myfunction,range(*split_range))
    pd.DataFrame(out).to_csv(f'filename_{index}.csv',header=False,index=False)

def myfunction(x): # toy example function
  return(x ** 2)

def split(start,stop,n):
    k,m = divmod(stop - start,n)
    return [(i * k + min(i,m),(i + 1) * k + min(i + 1,m)) for i in range(n)]

def main():
    RANGE_START = 0
    RANGE_STOP = 500
    N_SPLITS = 5
    n_processes = min(N_SPLITS,cpu_count())
    split_ranges = split(RANGE_START,RANGE_STOP,N_SPLITS) # [(0,100),(100,200),... (400,500)]
    process_pool = Pool(n_processes)
    thread_pool = ThreadPool(N_SPLITS)
    for index,split_range in enumerate(split_ranges):
        thread_pool.apply_async(worker,args=(process_pool,split_range))
    # wait for all threading tasks to complete:
    thread_pool.close()
    thread_pool.join()

# required for Windows:
if __name__ == '__main__':
    main()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。