池工作者的Python多进程 – 内存使用优化

我有一个模糊的字符串匹配脚本,在400万公司名称的大海捞针中寻找大约30K针.虽然脚本工作正常,但我在AWS h1.xlarge上通过并行处理加速处理的尝试失败了,因为我的内存不足.

我不想试图获得更多的内存,如回应my previous question所述,我想找出如何优化工作流程 – 我对此很新,所以应该有足够的空间.顺便说一句,我已经尝试过queues(也工作过,但遇到了同样的MemoryError,再看了一堆非常有用的SO贡献,但还没到那里.

这是与代码最相关的内容.我希望它足以澄清逻辑 – 很高兴根据需要提供更多信息:

def getHayStack():
    ## loads a few million company names into id: name dict
    return hayCompanies

def getNeedles(*args):
    ## loads subset of 30K companies into id: name dict (for allocation to workers)
    return needleCompanies

def findNeedle(needle,haystack):
    """ Identify best match and return results with score """
    results = {}
    for hayID,hayCompany in haystack.iteritems():
        if not isnull(haystack[hayID]):
            results[hayID] = levi.setratio(needle.split(' '),hayCompany.split(' '))
    scores = list(results.values())
    resultIDs = list(results.keys())
    needleID = resultIDs[scores.index(max(scores))]
    return [needleID,haystack[needleID],max(scores)]

def runMatch(args):
    """ Execute findNeedle and process results for poolWorker batch"""
    batch,first = args
    last = first + batch
    hayCompanies = getHayStack()
    needleCompanies = getTargets(first,last)
    needles = defaultdict(list)
    current = first
    for needleID,needleCompany in needleCompanies.iteritems():
        current += 1
        needles[targetID] = findNeedle(needleCompany,hayCompanies)
    ## Then store results

if __name__ == '__main__':
    pool = Pool(processes = numProcesses)
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    pool.map_async(runMatch,itertools.izip(itertools.repeat(targetsPerBatch),xrange(0,totalTargets,targetsPerBatch))).get(99999999)
    pool.close()
    pool.join()

所以我想问题是:我怎样才能避免为所有工人加载大海捞针 – 例如通过分享数据或采取不同的方法,例如将更大的干草堆划分为工人而不是针头?如何通过避免或消除混乱来改善内存使用?

最佳答案
你的设计有点令人困惑.您正在使用N个工作池,然后将M个工作分解为N个大小为M / N的任务.换句话说,如果你完全正确,那么你就是在工作流程之上构建的池之上模拟工作进程.为什么要这么麻烦?如果要使用进程,只需直接使用它们即可.或者,将池用作池,将每个作业作为自己的任务发送,并使用批处理功能以适当(和可调整)的方式批处理它们.

这意味着runMatch只需要一个needleID和needleCompany,它所做的就是调用findNeedle然后执行#Then store结果部分.然后主程序变得更简单:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        results = pool.map_async(runMatch,needleCompanies.iteritems(),chunkSize=NUMBER_TWEAKED_IN_TESTING).get()

或者,如果结果很小,而不是让所有进程(可能)都在争夺一些共享的结果存储事物,而只是返回它们.那么你根本就不需要runMatch,只需:

if __name__ == '__main__':
    with Pool(processes=numProcesses) as pool:
        for result in pool.imap_unordered(findNeedle,chunkSize=NUMBER_TWEAKED_IN_TESTING):
            # Store result

或者,如果您确实想要完成N个批次,只需为每个批次创建一个流程:

if __name__ == '__main__':
    totalTargets = len(getTargets('all'))
    targetsPerBatch = totalTargets / numProcesses
    processes = [Process(target=runMatch,args=(targetsPerBatch,targetsPerBatch))) 
                 for _ in range(numProcesses)]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

此外,您似乎每次为每个任务调用getHayStack()(以及getNeedles).我不确定在同一时间最终获得这个实时的多个副本是多么容易,但考虑到它是迄今为止最大的数据结构,这将是我试图排除的第一件事.实际上,即使它不是内存使用问题,getHayStack也很容易成为一个重大的性能损失,除非你已经在进行某种缓存(例如,第一次将它显式存储在全局或可变的默认参数值中),然后只是使用它),所以它可能值得修复.

一次解决两个潜在问题的一种方法是在Pool构造函数中使用初始化程序:

def initPool():
    global _haystack
    _haystack = getHayStack()

def runMatch(args):
    global _haystack
    # ...
    hayCompanies = _haystack
    # ...

if __name__ == '__main__':
    pool = Pool(processes=numProcesses,initializer=initPool)
    # ...

接下来,我注意到您在多个地方显式生成列表,而实际上并不需要它们.例如:

scores = list(results.values())
resultIDs = list(results.keys())
needleID = resultIDs[scores.index(max(scores))]
return [needleID,max(scores)]

如果有一些结果,这是浪费;只需直接使用results.values()迭代. (事实上​​,看起来你正在使用Python 2.x,在这种情况下,键和值已经是列表,所以你只是在没有充分理由的情况下制作一个额外的副本.)

但在这种情况下,你可以进一步简化整个事情.你只是寻找得分最高的关键(resultID)和值(得分),对吧?所以:

needleID,score = max(results.items(),key=operator.itemgetter(1))
return [needleID,score]

这也消除了对得分的所有重复搜索,这应该节省一些CPU.

这可能无法直接解决内存问题,但应该可以更容易地进行调试和/或调整.

首先要尝试的是使用更小的批次 – 而不是input_size / cpu_count,尝试1.内存使用量是否下降?如果没有,我们已经排除了这一部分.

接下来,尝试sys.getsizeof(_haystack)并查看它的内容.如果它是1.6GB,那么你正在削减一些东西,试图将其他所有东西都压缩到0.4GB,这就是攻击它的方式 – 例如,使用shelve数据库而不是简单的dict.

还尝试在初始化函数的开始和结束时转储内存使用(使用resource模块,getrusage(RUSAGE_SELF)).如果最终的干草堆只有0.3GB,但你又分配了1.3GB,这就是攻击的问题.例如,你可以分离一个子进程来构建和pickle dict,然后让池初始化器打开它并取消它.或者在第一个子节点中组合两个构建一个搁置数据库,并在初始化程序中以只读方式打开它.无论哪种方式,这也意味着您只进行一次CSV解析/字典构建工作而不是8次.

另一方面,如果您的总VM使用率仍然很低(请注意,当第一个任务运行时,getrusage没有任何方式直接看到您的总VM大小-ru_maxrss通常是一个有用的近似值,特别是如果ru_nswap为0),问题在于任务本身.

首先,获取任务函数的参数和返回的值.如果它们很大,特别是如果它们要么随着每个任务变得越来越大或者变化很大,那么它可能只是腌制和解开数据会占用太多内存,最终它们中的8个一起大到足以达到极限.

否则,问题很可能出在任务函数本身.要么你有内存泄漏(你只能通过使用有缺陷的C扩展模块或ctypes进行真正的泄漏,但是如果你在调用之间保留任何引用,例如,在全局中,你可能只是永远保持事物不必要地),或者某些任务本身会占用太多内存.无论哪种方式,这应该是你可以通过拉出多处理并直接运行任务来更容易测试的东西,这样更容易调试.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


前言 目前有个python应用需要在容器镜像内拉取git私有仓库的代码,一开始的想法是用GitPython,折腾一番ssh私钥和known_hosts问题后,发现还是在镜像中封装个git最省事,然后用subprocess调用系统命令,镜像体积也没有想象中增加特别多。 准备ssh私钥和known_ho
前言 当网络不稳定或应用页面加载有问题,可以设置等待,避免网络问题导致找不到元素等异常。 隐式等待 隐式等待设置的是最长等待时间,如果在规定时间内网页加载完成,则执行下一步,否则一直等到时间结束。 隐式等待在driver的整个生命周期都有效,初始化的时候设置一次即可。 # 隐式等待10秒 drive
前言 map()、reduce()、filter()是python的三个高阶函数。所谓高阶函数,指的是将函数作为参数并返回函数作为结果的函数。下面代码的sing_ready只是一个简单高阶函数示例: def ready(name): return f"ready,{name}!"
入门使用 # 示例代码 warframe = ["saryn", "wisp", "volt"] counts = [len(n) for n in warframe] for i,j in zip(warframe,counts): pr
前言 功能描述:批量重命名指定目录下的文件,文件名加前缀,默认格式为“目录名_原文件名”。 示例代码 import argparse import os import sys import logging def gen_args(): """ 说明 解析命令行参数 &
前言 常见的应用配置方式有环境变量和配置文件,对于微服务应用,还会从配置中心加载配置,比如nacos、etcd等,有的应用还会把部分配置写在数据库中。此处主要记录从环境变量、.env文件、.ini文件、.yaml文件、.toml文件、.json文件读取配置。 ini文件 ini文件格式一般如下: [
前言 在设计API返回内容时,通常需要与前端约定好API返回响应体内容的格式。这样方便前端进行数据反序列化时相应的解析处理,也方便其它服务调用。不同公司有不同的响应内容规范要求,这里以常见的JSON响应体为例: { "code": 200, "data": {
前言 我们一般使用如下方式点击元素: elem = driver.find_element(...) elem.click() # 或者使用带等待条件的方式 elem = WebDriverWait(driver, 10).until(EC.xxx(...)) elem.click() 正常情况下,
前言 从环境变量和配置文件中获取配置参数,相关库: python-dotenv:第三方库,需要使用pip安装 configparser:标准库 示例代码 test.ini [mysql] host = "192.168.0.10" port = 3306 user = &quot
前言 Relative Locators,相对定位器,是Selenium 4引入的一个新的定位器,相对定位器根据源点元素去定位相对位置的其它元素。 相对定位方法其实是基于JavaScript的 getBoundingClientRect() 而实现,简单的页面还行,复杂页面中可能会定位到需要相同类型
简介 The pytest framework makes it easy to write small, readable tests, and can scale to support complex functional testing for applications and librari
简介 Faker库可用于随机生成测试用的虚假数据。 可生成的数据参考底部的参考链接。 安装: python -m pip install faker 快速入门 from faker import Faker # 实例化一个对象,本地化使用中国 fk - Faker(locale="zh_C
前言 原本应用的日志是全部输出到os的stdout,也就是控制台输出。因其它团队要求也要保留日志文件,便于他们用其他工具统一采集,另一方面还要保留控制台输出,便于出问题的时候自己直接看pod日志。具体需求如下: 日志支持同时控制台输出和文件输出 控制台的输出级别可以高点,比如WARNING,个人这边
按列从多个文件中构建 假设有两个csv文件,列不相同,需要整合为一个dataframe,使用glob模块: from glob import glob import pandas as pd # glob会返回任意排序的文件名,所以需要sort排序 some_files = sorted(glob(
简介 diagrams是python的一个第三方库,用于实现使用代码绘制架构图。 安装 依赖于 Graphviz,安装diagrams之前需要先安装 Graphviz(下载压缩包后,将bin目录添加到系统环境变量Path里即可)。 python3 -m pip install diagrams 快速
前言 最近有个个人需求是要把多个图片文件合并为一个PDF文件,这样方便用PDF阅读器连续看,避免界面点一下,只会图片放大。(比如看漫画) 主要思路是先把单张图片转换成单个PDF文件,然后把PDF文件进行合并。原先是用WPS的转换工具做的,但WPS每次只能批量转换30张,如果有大量图片文件,用WPS就
前言 版本: python:3.9 selenium:4.1.5 获取元素文本 text = driver.find_element(by=By.XPATH, value="").text 获取元素属性值 attr1 = driver.find_element(by=By.XPA
Python中有个内置的函数叫做 enumerate,可以在迭代时返回元素的索引。 # 示例代码01 warframe = ["saryn", "wisp", "volt"] for i,name in enumerate(warframe
前言 版本: python:3.9 selenium:4.1.5 浏览器:firefox 创建浏览器对象 from selenium import webdriver driver = webdriver.Firefox(executable_path=r"C:\software\sele
前言 selenium提供八种元素定位的方法: find_element_by_id(): 通过id定位。一个页面中的id是唯一的。有id的话尽量使用id定位。 find_element_by_xpath(): 通过xpath语法定位(常用) find_element_by_link_text():