微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Python 中的并行生成器目标

如何解决Python 中的并行生成器目标

我正在寻找一种方法来实现一个内存高效的例程,以一次性填充两个 Pandas DataFrame(然而,我的问题不是关于 Pandas,问题是 DataFrames 似乎在从生成器,而不是通过外部连接条目)。

我有一个手写的扩展名,可以从一个文件获取条目。每个条目由“向量部分”和(一个或多个)“表格部分”组成。具有单个“表部分”的代表性模拟示例如下

data = [
    ((101,102),[               (0.1,11,2),(0.2,12,3) ]),((102,103),[ (0.3,21,1),(0.4,22,(0.5,23,((105,106),[                             (0.6,32,3) ])
    # ...
]

我想要两个生成器,我可以将它们传递给 DataFrame 构造函数,它们将产生两个序列:一个由“向量部分”(模拟示例中的 3x2)组成,逐行和其他由单个条目组成表格(模拟示例中为 6x3):

Vector dataframe

Table dataframe

最重要的是我想一口气生产出来。下面的代码片段可以以某种方式描述预期的结果,除了此代码只是打印行而不是将它们转发到提供给 DataFrame 构造函数生成器的重要部分。

def dispatch_data(data,cVect,cTable):
    cVect.send(None)
    cTable.send(None)
    for vect,table in data:
        cVect.send(vect)
        for line in table:
            cTable.send(line)

def connector():
    while True:
        datum = (yield)
        print(datum)
        #yield datum

c1,c2 = connector(),connector()
#df1,df2 = pd.DataFrame(c1),pd.FataFrame(c2)
dispatch_data(data,c1,c2)

如果取消注释注释行,该脚本会陷入无限循环,原因很明显,yield 中的两个 connector() 都被 send() 调用释放。

甚至可以在 Python 中将这样的数据流“拆分”到两个(或更多)生成器上吗?


更新

我找到了一个解决方案,它在没有线程或复杂的同步原语并且非常高效的意义上非常适合我。好的,我们这里有一个 Queue,但它是针对 greenlets 的,我发现了一个非常微妙的性能足迹。

它利用了 gevent 模块,演示代码实际上非常简单:

def dataframe_builder(q):
    def _get_item():
        yield from q
    return pd.DataFrame( _get_item() )

qVect,qTable = gevent.queue.Queue(100)  #< one may tune this values,gevent.queue.Queue(1000) #< for best performance

tVect,tTable = gevent.spawn(dataframe_builder,qVect),gevent.spawn(dataframe_builder,qTable)

for vect,table in data:
    qVect.put(vect)
    for line in table:
        qTable.put(line)
qVect.put(stopiteration)
qTable.put(stopiteration)

tVect.join()
tTable.join()

现在——如何用原生 Python 方式实现类似的结果?例如,我多次遇到过 geventasyncio 实际上相似的声明,那么如何使用 asyncio 实现这种场景?我仍然认为这是一个相当通用和基本的模式。

解决方法

一种可能性是通过首先使用纯迭代器处理较大“主”表的创建,然后生成较小的向量表来对数据进行分块。这样,只有较小的向量表会加载到内存中,而较大的表则完全被视为迭代器:

import pandas as pd
from collections import deque
def transform_entries(data):
   for a,b in data:
      yield ('vector',a)
      yield from (('table',i) for i in b)

class iter_table:
   def __init__(self,iter_t):
      self.iter_t = iter_t
      self.vector_t,self.flag = deque(),False
   def __iter__(self):
      return self
   def __next__(self):
      if not self.flag:
         while True:
            if (n:=next(self.iter_t,None)) is None:
                self.flag = True
                raise StopIteration
            if n[0] == 'table':
                return n[-1]
            self.vector_t.append(n[-1])
      else:
          if not self.vector_t:
              raise StopIteration
          return self.vector_t.popleft()
             

data = [((101,102),[(0.1,11,2),(0.2,12,3)]),((102,103),[(0.3,21,1),(0.4,22,(0.5,23,((105,106),[(0.6,32,3)])]
c = iter_table(transform_entries(data))
df_table,df_vector = pd.DataFrame(c),pd.DataFrame(c)
print(df_table)
print(df_vector)

输出:

#main table
     0   1  2
0  0.1  11  2
1  0.2  12  3
2  0.3  21  1
3  0.4  22  2
4  0.5  23  3
5  0.6  32  3
#vector table
     0    1
0  101  102
1  102  103
2  105  106

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。