如何解决Python 中的并行生成器目标
我正在寻找一种方法来实现一个内存高效的例程,以一次性填充两个 Pandas DataFrame(然而,我的问题不是关于 Pandas,问题是 DataFrames 似乎在从生成器,而不是通过外部连接条目)。
我有一个手写的扩展名,可以从一个大文件中获取条目。每个条目由“向量部分”和(一个或多个)“表格部分”组成。具有单个“表部分”的代表性模拟示例如下
data = [
((101,102),[ (0.1,11,2),(0.2,12,3) ]),((102,103),[ (0.3,21,1),(0.4,22,(0.5,23,((105,106),[ (0.6,32,3) ])
# ...
]
我想要两个生成器,我可以将它们传递给 DataFrame 构造函数,它们将产生两个序列:一个由“向量部分”(模拟示例中的 3x2)组成,逐行和其他由单个条目组成表格(模拟示例中为 6x3):
最重要的是我想一口气生产出来。下面的代码片段可以以某种方式描述预期的结果,除了此代码只是打印行而不是将它们转发到提供给 DataFrame 构造函数的生成器的重要部分。
def dispatch_data(data,cVect,cTable):
cVect.send(None)
cTable.send(None)
for vect,table in data:
cVect.send(vect)
for line in table:
cTable.send(line)
def connector():
while True:
datum = (yield)
print(datum)
#yield datum
c1,c2 = connector(),connector()
#df1,df2 = pd.DataFrame(c1),pd.FataFrame(c2)
dispatch_data(data,c1,c2)
如果取消注释注释行,该脚本会陷入无限循环,原因很明显,yield
中的两个 connector()
都被 send()
调用释放。
甚至可以在 Python 中将这样的数据流“拆分”到两个(或更多)生成器上吗?
更新
我找到了一个解决方案,它在没有线程或复杂的同步原语并且非常高效的意义上非常适合我。好的,我们这里有一个 Queue
,但它是针对 greenlets 的,我发现了一个非常微妙的性能足迹。
def dataframe_builder(q):
def _get_item():
yield from q
return pd.DataFrame( _get_item() )
qVect,qTable = gevent.queue.Queue(100) #< one may tune this values,gevent.queue.Queue(1000) #< for best performance
tVect,tTable = gevent.spawn(dataframe_builder,qVect),gevent.spawn(dataframe_builder,qTable)
for vect,table in data:
qVect.put(vect)
for line in table:
qTable.put(line)
qVect.put(stopiteration)
qTable.put(stopiteration)
tVect.join()
tTable.join()
现在——如何用原生 Python 方式实现类似的结果?例如,我多次遇到过 gevent
与 asyncio
实际上相似的声明,那么如何使用 asyncio
实现这种场景?我仍然认为这是一个相当通用和基本的模式。
解决方法
一种可能性是通过首先使用纯迭代器处理较大“主”表的创建,然后生成较小的向量表来对数据进行分块。这样,只有较小的向量表会加载到内存中,而较大的表则完全被视为迭代器:
import pandas as pd
from collections import deque
def transform_entries(data):
for a,b in data:
yield ('vector',a)
yield from (('table',i) for i in b)
class iter_table:
def __init__(self,iter_t):
self.iter_t = iter_t
self.vector_t,self.flag = deque(),False
def __iter__(self):
return self
def __next__(self):
if not self.flag:
while True:
if (n:=next(self.iter_t,None)) is None:
self.flag = True
raise StopIteration
if n[0] == 'table':
return n[-1]
self.vector_t.append(n[-1])
else:
if not self.vector_t:
raise StopIteration
return self.vector_t.popleft()
data = [((101,102),[(0.1,11,2),(0.2,12,3)]),((102,103),[(0.3,21,1),(0.4,22,(0.5,23,((105,106),[(0.6,32,3)])]
c = iter_table(transform_entries(data))
df_table,df_vector = pd.DataFrame(c),pd.DataFrame(c)
print(df_table)
print(df_vector)
输出:
#main table
0 1 2
0 0.1 11 2
1 0.2 12 3
2 0.3 21 1
3 0.4 22 2
4 0.5 23 3
5 0.6 32 3
#vector table
0 1
0 101 102
1 102 103
2 105 106
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。