如何解决如何从 Python 中的两对文件中获取枢轴线? 通用合并扫描和书写
从 How to get the pivot lines from two tab-separated files? 开始,有一种使用 unix 命令从两个文件中透视行的快速方法。
如果我们有两对文件:
-
f1a
和f1b
-
f2a
和f2b
- f1a / f2a
- f1b
- f2b
其中 f1a / f2a
是文件中同时出现在 f1a
和 f1b
中的行:
我尝试了以下可行的方法,但如果文件非常大,则存储 f1
和 f2
字典将需要大量内存。例如。数十亿行的文件。
import sys
from tqdm import tqdm
f1a,f1b,f2a,f2b = sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]
# Read first pair of file into memory.
with open(f1a) as fin_f1a,open(f1a) as fin_f1b:
f1 = {s.strip().replace('\t',' ') :t.strip().replace('\t',' ') for s,t in tqdm(zip(fin_f1a,fin_f1b))}
with open(s2) as fin_f2a,open(t2) as fin_f2b:
f2 = {s.strip().replace('\t',t in tqdm(zip(fin_f2a,fin_f2b))}
with open('piVoted.tsv','w') as fout:
for s in tqdm(f1.keys() & f2.keys()):
print('\t'.join([s,f1[s],f2[s]]),end='\n',file=fout)
是否有更快/更好/更简单的方法来在 Python 中实现相同的 3 列制表符分隔文件?是否有可以对大文件有效执行此类操作的库?
使用 turicreate.SFrame
,我还可以:
from turicreate import SFrame
f1a,sys.argv[4]
sf1a = SFrame.read_csv(f1a,delimited='\0',header=False)
sf1b = SFrame.read_csv(f1b,header=False)
sf2a = SFrame.read_csv(f2a,header=False)
sf2b = SFrame.read_csv(f2b,header=False)
sf1 = sf1a.join(sf1b)
sf2 = sf2a.join(sf2b)
sf = sf1.join(sf2,on='X1',how='left')
sf.save('piVoted')
解决方法
通用合并
zip 函数不会存储可迭代对象的整个副本。所以我们可以安全地使用它。
假设您有两个按第一列升序排序的可迭代对象,您可以按如下方式连接两个表。
def merge(t1,t2):
end = object()
end_ = end,None
a1,b1 = next(t1,end_)
a2,b2 = next(t2,end_)
while a1 is not end and a2 is not end:
if a1 < a2:
a1,end_)
elif a1 > a2:
a2,end_)
else:
yield a1,b1,b2
a1,end_)
a2,end_)
使用两个迭代器调用合并并生成第三个迭代器,并且每次只需要存储每个迭代器的一个元素。
list(merge(iter([(0,1),(1,(3,2)]),iter([(0,'a'),'b'),(2,'c'),'d'),(4,'e')])))
[(0,1,2,'d')]
扫描和书写
为了防止整个文件被存储,我有一个扫描方法,它会读取每个文件一次yield
一行。
def scan(fa,fb):
for a,b in zip(fa,fb):
a = a.strip().replace('\t',' ')
b = b.strip().replace('\t',' ')
yield a,b
def scan_by_name(fa,fb):
with open(fa) as fha,open(fb) as fhb:
yield from scan(fha,fhb)
然后你可以用这种方式解决你的问题(未经测试,我没有你的文件)
with open('pivoted.tsv','w') as fout:
t1 = scan_by_name(f1a,f1b)
t2 = scan_by_name(f2a,f2b)
for row in merge(t1,t2):
print('\t'.join(row),end='\n',file=fout)
,
正如 leangaurav 所建议的,可以使用 Dask 来完成。
优点是我们可以制作一个在线程(或进程)池中运行并读取文件块(使用很少的 RAM)而不必担心这一点的解决方案。
例如,我们创建一些测试数据:
from string import ascii_lowercase
from random import choices
def rand_str(k=3):
return " ".join("".join(choices(ascii_lowercase,k=k)) for _ in range(2))
N = 2_000
for file_name in ["example_a.txt","example_b.txt"]:
with open(file_name,"w") as f:
for _ in range(N):
line = f"{rand_str()} \t {rand_str()}\n"
f.write(line)
然后我们用 dask 读取数据,我们指出哪一列将成为索引并进行合并:
from dask import compute
import dask.dataframe as dd
# this does not process anything yet
df_a = dd.read_csv("example_a.txt",sep="\t",names=["pivot","data"]).set_index("pivot")
df_b = dd.read_csv("example_b.txt","data"]).set_index("pivot")
# this is the heavy part
result = dask.compute(dd.merge(df_a,df_b,left_index=True,right_index=True))
# save the output
result[0].to_csv("out.txt",header=False)
在旧笔记本上针对不同 N 的一些测试(仅考虑计算步骤):
- N = 500_000 -> 11 秒
- N = 1_000_000 -> 25 秒
- N = 2_000_000 -> 44 秒
- N = 4_000_000 -> 1m33s
将立即评估对字典的理解。
如果您不喜欢流式传输,请尝试对数据进行分段。
def oneSegment(first_letter)
# Read first pair of file into memory.
with open(f1a) as fin_f1a,open(f1a) as fin_f1b:
f1 = {s.strip().replace('\t',' ') :t.strip().replace('\t',' ') for s,t in tqdm(zip(fin_f1a,fin_f1b)) if s.strip().replace('\t',' ').startwith(first_letter)}
with open(s2) as fin_f2a,open(t2) as fin_f2b:
f2 = {s.strip().replace('\t',t in tqdm(zip(fin_f2a,fin_f2b)) if s.strip().replace('\t',' ').startwith(first_letter)}
with open('pivoted.tsv','a') as fout:
for s in tqdm(f1.keys() & f2.keys()):
print('\t'.join([s,f1[s],f2[s]]),file=fout)
oneSegment("a")
,
此解决方案假设您在 linux 上运行 Python 代码,您可以通过 os.system
运行一系列 linux 命令来实现此目的。某些步骤可以合并为一个命令,也可以作为单独的进程并行运行以提高速度。
同一组中的每个任务可以并行运行。
(1,3,4),(5,6),(7,8),(9,10,11)
import os
f1a = "f1a"
f1b = "f1b"
f2a = "f2a"
f2b = "f2b"
# replace \t with space
os.system(f"sed -i 's/\t/ /g' {f1a}") #1
os.system(f"sed -i 's/\t/ /g' {f1b}") #2
os.system(f"sed -i 's/\t/ /g' {f2a}") #3
os.system(f"sed -i 's/\t/ /g' {f2b}") #4
# join the pair of files with \t
os.system(f"paste {f1a} {f1b} > f1_t") #5 join f1a and f1b with \t delimiter
os.system(f"paste {f2a} {f2b} > f2_t") #6 join f1a and f1b with \t delimiter
# prepare data for join: sort the files
os.system(f"sort f1_t > f1") #7
os.system(f"sort f2_t > f2") #8
os.system(f"join f1 f2 -j 1 -t '\t'> f12") #9 lines common to both f1,f2: -j 1
os.system(f"join f1 f2 -v 1 -j 1 -t '\t'> f11") #10 lines present only in f1 : -v 1
os.system(f"join f1 f2 -v 2 -j 1 -t '\t'> f22") #11 lines present only in f2 : -v 2
os.system(f"cat f12 f11 f22 > f") #12 join into final result f
由于问题提到的是 python 而不是平台,对于平台中立方法,请查看 dask。
这个answer解决了一部分问题,就是合并大文件。但是加入文件集等仍然需要弄清楚,Dask也应该可以做到。或者预处理可以通过上面的代码(甚至代码之外)完成,合并可以用 Dask 完成。另请检查 this 答案。
在处理大数据块时,排序使事情变得更好。
在 this dask 文档中查看有关 indexes
的详细信息。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。