加速读取压缩的 bz2 文件'rb' 模式

如何解决加速读取压缩的 bz2 文件'rb' 模式

我有一个超过 10GB 的 BZ2 文件。我想在不解压成临时文件的情况下阅读它(它会超过 50GB)。

使用这种方法:

import bz2,time
t0 = time.time()
time.sleep(0.001) # to avoid / by 0
with bz2.open("F:\test.bz2",'rb') as f:
    for i,l in enumerate(f):
        if i % 100000 == 0:
            print('%i lines/sec' % (i/(time.time() - t0)))

我每秒只能读取约 25 万行。在类似的文件中,首先解压,我每秒得到大约 3M 行,即 x10 因子:

with open("F:\test.txt",'rb') as f:

我认为这不仅是由于固有的解压 CPU 时间(因为解压到临时文件的总时间 + 作为未压缩文件的读取比此处描述的方法小得多),而且可能是缺少缓冲,或者其他原因。 bz2.open 还有其他更快的 Python 实现吗?

如何在二进制模式下加速读取 BZ2 文件,并循环“行”?(以 \n 分隔)

注意:目前 time to decompress test.bz2 into test.tmp + time to iterate over lines of test.tmp 远小于 time to iterate over lines of bz2.open('test.bz2'),这可能不应该是这种情况。

链接主题:https://discuss.python.org/t/non-optimal-bz2-reading-speed/6869

解决方法

您可以使用 BZ2Decompressor 来处理大文件。它以增量方式解压缩数据块,开箱即用:

t0 = time.time()
time.sleep(0.000001)
with open('temp.bz2','rb') as fi:
    decomp = bz2.BZ2Decompressor()
    residue = b''
    total_lines = 0
    for data in iter(lambda: fi.read(100 * 1024),b''):
        raw = residue + decomp.decompress(data) # process the raw data and  concatenate residual of the previous block to the beginning of the current raw data block
        residue = b''
        # process_data(current_block) => do the processing of the current data block
        current_block = raw.split(b'\n')
        if raw[-1] != b'\n':
            residue = current_block.pop() # last line could be incomplete
        total_lines += len(current_block)
        print('%i lines/sec' % (total_lines / (time.time() - t0)))
    # process_data(residue) => now finish processing the last line
    total_lines += 1
    print('Final: %i lines/sec' % (total_lines / (time.time() - t0)))

在这里,我读取了一大块二进制文件,将其送入解压缩器并接收一大块解压缩数据。请注意,解压后的数据块必须连接起来才能恢复原始数据。这就是为什么最后一个条目需要特殊处理。

在我的实验中,它的运行速度比使用 io.BytesIO() 的解决方案快一点。众所周知,bz2 速度很慢,因此如果您感到困扰,可以考虑迁移到 snappyzstandard

关于在 Python 中处理 bz2 所需的时间。使用 Linux 实用程序将文件解压缩为临时文件,然后处理普通文本文件可能是最快的。否则,您将依赖 Python 的 bz2 实现。

,

这种方法已经比原生 bz2.open 提高了 2 倍。

import bz2,time,io

def chunked_readlines(f):
    s = io.BytesIO()
    while True:
        buf = f.read(1024*1024)
        if not buf:
            return s.getvalue()
        s.write(buf)
        s.seek(0)
        L = s.readlines()
        yield from L[:-1]
        s = io.BytesIO()
        s.write(L[-1])  # very important: the last line read in the 1 MB chunk might be
                        # incomplete,so we keep it to be processed in the next iteration
                        # TODO: check if this is ok if f.read() stopped in the middle of a \r\n?

t0 = time.time()
i = 0
with bz2.open("D:\test.bz2",'rb') as f:
    for l in chunked_readlines(f):       # 500k lines per second
    # for l in f:                        # 250k lines per second
        i += 1
        if i % 100000 == 0:
            print('%i lines/sec' % (i/(time.time() - t0)))

也许可以做得更好。

如果我们可以将 s 用作简单的 bytes 对象而不是 io.BytesIO,我们可以获得 x4 的改进。但不幸的是,在这种情况下,splitlines() 的行为不符合预期:splitlines() and iterating over an opened file give different results

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res