环境
我正在使用Python 2.7
Ubuntu 16.04
问题
>从多个数据源(HTTP请求,系统信息等)收集数据
>根据此数据计算指标
>以各种格式输出这些指标
在进入下一阶段之前,每个阶段都必须完成,但是每个阶段都包含多个可以并行运行的子任务(我可以发送3个HTTP请求并在等待它们返回时读取系统日志)
我已经将阶段划分为模块,将子任务划分为子模块,因此我的项目层次结构如下:
+ datasources |-- __init__.py |-- data_one.py |-- data_two.py |-- data_three.py + metrics |-- __init__.py |-- metric_one.py |-- metric_two.py + outputs |-- output_one.py |-- output_two.py - app.py
app.py看起来大致如此(伪代码简洁):
import datasources import metrics import outputs for datasource in dir(datasources): datasource.refresh() for metric in dir(metrics): metric.calculate() for output in dir(outputs): output.dump()
(包含dir调用的附加代码忽略系统模块,有异常处理等 – 但这是它的要点)
每个数据源子模块看起来大致如下:
data = [] def refresh(): # Populate the "data" member somehow data = [1,2,3] return
每个度量子模块看起来大致如下:
import datasources.data_one as data_one import datasources.data_two as data_two data = [] def calculate(): # Use the datasources to compute the metric data = [sum(x) for x in zip(data_one,data_two)] return
为了并行化第一阶段(数据源),我写了一些简单的东西,如下所示:
def run_thread(datasource): datasource.refresh() threads = [] for datasource in dir(datasources): thread = threading.Thread(target=run_thread,args=(datasource)) threads.append(thread) thread.start() for thread in threads: thread.join()
这可以工作,然后我可以计算任何指标,并填充datasources.x.data属性
为了并行化第二阶段(度量),因为它更少依赖于I / O而更多地依赖于cpu,我觉得简单的线程实际上不会加速,我需要多处理模块才能利用多核.我写了以下内容:
def run_pool(calculate): calculate() pool = multiprocessing.Pool() pool.map(run_pool,[m.calculate for m in dir(metrics)] pool.close() pool.join()
这段代码运行了几秒钟(所以我认为它有效吗?)但是当我尝试时:
metrics.metric_one.data
它返回[],就像模块从未运行过一样
不知何故,通过使用多处理模块,它似乎是对线程进行限定,以便它们不再共享数据属性.我应该如何重写这一点,以便我可以并行计算每个指标,利用多个核心,但在完成后仍然可以访问数据?
解决方法
因为你在2.7,并且你正在处理模块而不是对象,所以你在挑选你需要的东西时遇到了问题.解决方法并不漂亮.它涉及将每个模块的名称传递给您的操作功能.我更新了部分部分,并且还更新了删除with语法.
一些东西:
首先,一般来说,多线程比线程更好.使用线程,您总是冒着处理Global Interpreter Lock的风险,这可能效率极低.如果您使用多核,这将成为一个非问题.
其次,你有正确的概念,但是通过拥有一个全局到模块的数据成员,你会让它变得奇怪.使您的源返回您感兴趣的数据,并使您的度量(和输出)将数据列表作为输入并输出结果列表.
这会将你的伪代码变成这样的东西:
app.py:
import datasources import metrics import outputs pool = multiprocessing.Pool() data_list = pool.map(lambda o: o.refresh,list(dir(datasources))) pool.close() pool.join() pool = multiprocessing.Pool() metrics_funcs = [(m,data_list) for m in dir(metrics)] metrics_list = pool.map(lambda m: m[0].calculate(m[1]),metrics_funcs) pool.close() pool.join() pool = multiprocessing.Pool() output_funcs = [(o,data_list,metrics_list) for o in dir(outputs)] output_list = pool.map(lambda o: o[0].dump(o[1],o[2]),output_funcs) pool.close() pool.join()
完成此操作后,您的数据源将如下所示:
def refresh(): # Populate the "data" member somehow return [1,3]
您的指标看起来像这样:
def calculate(data_list): # Use the datasources to compute the metric return [sum(x) for x in zip(data_list)]
最后,您的输出可能如下所示:
def dump(data_list,metrics_list): # do whatever; you Now have all the information
删除数据“全局”并传递它使每个部分更清洁(并且更容易测试).这突出了使每件作品完全独立.正如你所看到的,我正在做的就是改变传递给map的列表中的内容,在这种情况下,我通过将它们作为元组传递并在函数中解压缩来注入所有先前的计算.当然,你不必使用lambdas.您可以单独定义每个函数,但实际上并没有太多定义.但是,如果确实定义了每个函数,则可以使用partial函数来减少传递的参数数量.我经常使用这种模式,在你更复杂的代码中,你可能需要.这是一个例子:
from functools import partial do_dump(module_name,metrics_list): globals()[module_name].dump(data_list,metrics_list) invoke = partial(do_dump,data_list=data_list,metrics_list=metrics_list) with multiprocessing.Pool() as pool: output_list = pool.map(invoke,[o.__name__ for o in dir(outputs)]) pool.close() pool.join()
根据评论更新:
当您使用map时,您可以保证输入的顺序与输出的顺序相匹配,即data_list [i]是运行dir(datasources)[i] .refresh()的输出.我不会将数据源模块导入指标,而是将其更改为app.py:
data_list = ... pool.close() pool.join() data_map = {name: data_list[i] for i,name in enumerate(dir(datasources))}
然后将data_map传递给每个指标.然后,度量标准按名称获取它想要的数据,例如
d1 = data_map['data_one'] d2 = data_map['data_two'] return [sum(x) for x in zip([d1,d2])]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。