微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用多处理并行使用 python gneerator?

如何解决如何使用多处理并行使用 python gneerator?

如何提高 networkx 函数性能 local_bridges https://networkx.org/documentation/stable//reference/algorithms/generated/networkx.algorithms.bridges.local_bridges.html#networkx.algorithms.bridges.local_bridges

我已经尝试过使用 pypy - 但到目前为止我仍然坚持在单核上使用生成器。我的图有 30 万条边。一个例子:

# construct the nx Graph:
import networkx as nx
# construct an undirected graph here - this is just a dummy graph
G = nx.cycle_graph(300000)

# fast - as it only returns an generator/iterator
lb = nx.local_bridges(G)

# individual item is also fast
%%time
next(lb)
cpu times: user 1.01 s,sys: 11 ms,total: 1.02 s
Wall time: 1.02 s

# computing all the values is very slow.
lb_list = list(lb)

如何并行使用此迭代器以利用所有处理器内核?当前的简单实现仅使用单个内核!

我天真的多线程第一次尝试是:

import multiprocessing as mp
lb = nx.local_bridges(G)
pool = mp.Pool()
lb_list = list(pool.map((),lb))

但是,我不想应用特定的函数 - () 而只是从迭代器并行获取 next 元素。

相关: python or dask parallel generator?

编辑

我想归结为如何并行化:

lb_res = []
lb = nx.local_bridges(G)
for node in range(1,len(G) +1):
    lb_res.append(next(lb))
    
lb_res

天真地使用多处理显然失败了:

# from multiprocessing import Pool
# https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror
from multiprocess import Pool
lb_res = []
lb = nx.local_bridges(G)

def my_function(thing):
    return next(thing)

with Pool(5) as p:
    parallel_result = p.map(my_function,range(1,len(G) +1))
    
parallel_result

但我不清楚如何将我的生成器作为参数传递给 map 函数 - 并完全消耗生成器。

编辑 2

对于这个特定问题,事实证明瓶颈是 with_span=True 参数的最短路径计算。禁用时,速度相当快。

当需要计算跨度时,我建议 cugraph 在 GPU 上快速实现 Sssp。尽管如此,这组边上的迭代并不是并行发生的,应该进一步改进。

但是,要了解更多信息,我有兴趣了解如何在 python 中并行化生成器的消耗。

解决方法

您不能并行使用生成器,每个非平凡生成器的下一个状态由其当前状态决定。您必须按顺序调用 next()

https://github.com/networkx/networkx/blob/master/networkx/algorithms/bridges.py#L162 这是函数的实现方式

for u,v in G.edges:
    if not (set(G[u]) & set(G[v])):
        yield u,v

所以你可以使用类似这样的东西来并行化它,但是你将不得不承担使用类似 multiprocessing.Manager 的东西合并这些单个列表的惩罚。我认为这只会让整个过程变慢,但你可以自己计时。

def process_edge(e):
    u,v = e
    lb_list = []
    if not (set(G[u]) & set(G[v])):
        lb_list.append((u,v))
with Pool(os.cpu_count()) as pool:
    pool.map(process_edge,G.edges)

另一种方法是将图拆分为顶点范围并同时处理它们。

def process_nodes(nodes):
    lb_list = []
    for u in nodes:
        for v in G[u]:
            if not (set(G[u]) & set(G[v])):
                lb_list.append((u,v))

with Pool(os.cpu_count()) as pool:
    pool.map(process_nodes,np.array_split(list(range(G.number_of_nodes())),os.cpu_count()))

也许您还可以检查是否存在针对此问题的更好算法。或者找一个用 C 实现的更快的库。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。