如何解决如何跟踪从多处理池返回的异步结果
没有看到实际的代码,我只能概括地回答。但是有两种通用的解决方案。
首先,不要使用acallback
并忽略AsyncResult
s
,而是将它们存储在某种集合中。然后,您可以使用该集合。例如,如果您希望能够使用该函数作为键来查找该函数的结果,则只需dict
使用以下函数创建一个键:
def in_parallel(funcs):
results = {}
pool = mp.Pool()
for func in funcs:
results[func] = pool.apply_async(func)
pool.close()
pool.join()
return {func: result.get() for func, result in results.items()}
另外,您可以更改回调函数以按键将结果存储在集合中。例如:
def in_parallel(funcs):
results = {}
pool = mp.Pool()
for func in funcs:
def callback(result, func=func):
results[func] = result
pool.apply_async(func, callback=callback)
pool.close()
pool.join()
return results
我将函数本身用作键。但是,您想使用索引来代替,这很容易。您拥有的任何值都可以用作键。
同时,您链接的示例实际上只是对一堆参数调用相同的函数,等待所有参数完成,然后以任意顺序将结果以可迭代的方式保留。imap_unordered
确实是这样,但是简单得多。您可以用以下代码替换链接代码中的整个复杂的东西:
pool = mp.Pool()
results = list(pool.imap_unordered(foo_pool, range(10)))
pool.close()
pool.join()
然后,如果您希望结果按其原始顺序而不是任意顺序排列,则可以切换到imap
或map
代替。所以:
pool = mp.Pool()
results = pool.map(foo_pool, range(10))
pool.close()
pool.join()
如果您需要类似但又太复杂以至于无法融入该map
范式的内容,concurrent.futures
则可能会使您的生活比轻松multiprocessing
。如果您使用的是Python
2.x,则必须安装backport。但是,然后您可以完成用AsyncResult
s或callback
s(或map
)完成的工作,例如将一大堆期货组成一个大期货。请参阅链接文档中的示例。
最后一点:
如果您无法修改函数,则可以随时对其进行包装。例如,假设我有一个返回数字平方的函数,但是我试图构建一个将数字异步映射到其平方的字典,所以我也需要将原始数字作为结果的一部分。这很容易:
def number_and_square(x):
return x, square(x)
现在,我可以apply_async(number_and_square)
代替square
而获得所需的结果。
在上面的示例中,我没有这样做,因为在第一种情况下,我是从调用方将密钥存储到集合中的,而在第二种情况下,我将其绑定到了回调函数中。但是,将它们绑定到函数周围的包装就像这两个函数一样容易,并且在这两个函数都不存在的情况下可能是合适的。
解决方法
我试图将多处理功能添加到某些功能无法修改的代码中。我想将这些功能作为作业异步提交到多处理池。我正在做类似此处所示代码的操作。但是,我不确定如何跟踪结果。我如何知道返回的结果与哪个应用函数相对应?
需要强调的要点是,我无法修改现有功能(其他东西依赖于它们的原样保留),并且结果可以按与功能作业应用于池的顺序不同的顺序返回。
感谢您对此的任何想法!
编辑:下面的一些尝试代码:
import multiprocessing
from multiprocessing import Pool
import os
import signal
import time
import inspect
def multiply(multiplicand1=0,multiplicand2=0):
return multiplicand1*multiplicand2
def workFunctionTest(**kwargs):
time.sleep(3)
return kwargs
def printHR(object):
"""
This function prints a specified object in a human readable way.
"""
# dictionary
if isinstance(object,dict):
for key,value in sorted(object.items()):
print u'{a1}: {a2}'.format(a1=key,a2=value)
# list or tuple
elif isinstance(object,list) or isinstance(object,tuple):
for element in object:
print element
# other
else:
print object
class Job(object):
def __init__(
self,workFunction=workFunctionTest,workFunctionKeywordArguments={'testString': "hello world"},workFunctionTimeout=1,naturalLanguageString=None,classInstance=None,resultGetter=None,result=None
):
self.workFunction=workFunction
self.workFunctionKeywordArguments=workFunctionKeywordArguments
self.workFunctionTimeout=workFunctionTimeout
self.naturalLanguageString=naturalLanguageString
self.classInstance=self.__class__.__name__
self.resultGetter=resultGetter
self.result=result
def description(self):
descriptionString=""
for key,value in sorted(vars(self).items()):
descriptionString+=str("{a1}:{a2} ".format(a1=key,a2=value))
return descriptionString
def printout(self):
"""
This method prints a dictionary of all data attributes.
"""
printHR(vars(self))
class JobGroup(object):
"""
This class acts as a container for jobs. The data attribute jobs is a list of job objects.
"""
def __init__(
self,jobs=None,naturalLanguageString="null",result=None
):
self.jobs=jobs
self.naturalLanguageString=naturalLanguageString
self.classInstance=self.__class__.__name__
self.result=result
def description(self):
descriptionString=""
for key,a2=value))
return descriptionString
def printout(self):
"""
This method prints a dictionary of all data attributes.
"""
printHR(vars(self))
def initialise_processes():
signal.signal(signal.SIGINT,signal.SIG_IGN)
def execute(
jobObject=None,numberOfProcesses=multiprocessing.cpu_count()
):
# Determine the current function name.
functionName=str(inspect.stack()[0][3])
def collateResults(result):
"""
This is a process pool callback function which collates a list of results returned.
"""
# Determine the caller function name.
functionName=str(inspect.stack()[1][3])
print("{a1}: result: {a2}".format(a1=functionName,a2=result))
results.append(result)
def getResults(job):
# Determine the current function name.
functionName=str(inspect.stack()[0][3])
while True:
try:
result=job.resultGetter.get(job.workFunctionTimeout)
break
except multiprocessing.TimeoutError:
print("{a1}: subprocess timeout for job".format(a1=functionName,a2=job.description()))
#job.result=result
return result
# Create a process pool.
pool1 = multiprocessing.Pool(numberOfProcesses,initialise_processes)
print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName,a2=str(pool1),a3=str(numberOfProcesses)))
# Unpack the input job object and submit it to the process pool.
print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName,a2=jobObject))
if isinstance(jobObject,Job):
# If the input job object is a job,apply it to the pool with its associated timeout specification.
# Return a list of results.
job=jobObject
print("{a1}: job submitted to pool: {a2}".format(a1=functionName,a2=job.description()))
# Apply the job to the pool,saving the object pool.ApplyResult to the job object.
job.resultGetter=pool1.apply_async(
func=job.workFunction,kwds=job.workFunctionKeywordArguments
)
# Get results.
# Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
print("{a1}: getting results for job...".format(a1=functionName))
job.result=getResults(job)
print("{a1}: job completed: {a2}".format(a1=functionName,a2=job.description()))
print("{a1}: job result: {a2}".format(a1=functionName,a2=job.result))
# Return the job result from execute.
return job.result
pool1.terminate()
pool1.join()
elif isinstance(jobObject,JobGroup):
# If the input job object is a job group,cycle through each job and apply it to the pool with its associated timeout specification.
for job in jobObject.jobs:
print("{a1}: job submitted to pool: {a2}".format(a1=functionName,a2=job.description()))
# Apply the job to the pool,saving the object pool.ApplyResult to the job object.
job.resultGetter=pool1.apply_async(
func=job.workFunction,kwds=job.workFunctionKeywordArguments
)
# Get results.
# Cycle through each job and and append the result for the job to a list of results.
results=[]
for job in jobObject.jobs:
# Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
print("{a1}: getting results for job...".format(a1=functionName))
job.result=getResults(job)
print("{a1}: job completed: {a2}".format(a1=functionName,a2=job.description()))
#print("{a1}: job result: {a2}".format(a1=functionName,a2=job.result))
# Collate the results.
results.append(job.result)
# Apply the list of results to the job group data attribute results.
jobObject.results=results
print("{a1}: job group results: {a2}".format(a1=functionName,a2=jobObject.results))
# Return the job result list from execute.
return jobObject.results
pool1.terminate()
pool1.join()
else:
# invalid input object
print("{a1}: invalid job object {a2}".format(a1=functionName,a2=jobObject))
def main():
print('-'*80)
print("MULTIPROCESSING SYSTEM DEMONSTRATION\n")
# Create a job.
print("# creating a job...\n")
job1=Job(
workFunction=workFunctionTest,workFunctionTimeout=4
)
print("- printout of new job object:")
job1.printout()
print("\n- printout of new job object in logging format:")
print job1.description()
# Create another job.
print("\n# creating another job...\n")
job2=Job(
workFunction=multiply,workFunctionKeywordArguments={'multiplicand1': 2,'multiplicand2': 3},workFunctionTimeout=6
)
print("- printout of new job object:")
job2.printout()
print("\n- printout of new job object in logging format:")
print job2.description()
# Create a JobGroup object.
print("\n# creating a job group (of jobs 1 and 2)...\n")
jobGroup1=JobGroup(
jobs=[job1,job2],)
print("- printout of new job group object:")
jobGroup1.printout()
print("\n- printout of new job group object in logging format:")
print jobGroup1.description()
# Submit the job group.
print("\nready to submit job group")
response=raw_input("\nPress Enter to continue...\n")
execute(jobGroup1)
response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n")
# Demonstrate timeout.
print("\n # creating a new job in order to demonstrate timeout functionality...\n")
job3=Job(
workFunction=workFunctionTest,workFunctionTimeout=1
)
print("- printout of new job object:")
job3.printout()
print("\n- printout of new job object in logging format:")
print job3.description()
print("\nNote the timeout specification of only 1 second.")
# Submit the job.
print("\nready to submit job")
response=raw_input("\nPress Enter to continue...\n")
execute(job3)
response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.")
print('-'*80)
if __name__ == '__main__':
main()
编辑:由于以下陈述的原因,此问题被[保留]:
“要求代码的问题必须表现出对所解决问题的最低限度的理解。包括尝试的解决方案,为何不起作用以及预期的结果。另请参见:堆栈溢出问题清单”
这个问题不是在请求代码。它要求思想,一般指导。展示了对所考虑问题的最低理解(请注意正确使用术语“多处理”,“池”和“异步”,并注意对先前代码的引用)。关于尝试的解决方案,我承认尝试解决方案将是有益的。我现在添加了这样的代码。我希望我已经解决了引起[搁置]状态的关切。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。