微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

python pathos map函数返回''_hashlib.HASH'对象'

如何解决python pathos map函数返回''_hashlib.HASH'对象'

这是我的代码(基于此 answer from @Mike McKerns):

from pathos.multiprocessing import ProcessingPool as Pool

class dispatcher():

    def __init__(self,controller):
        self.controller = controller
        self.sf = self.controller.sf
        self.jobs = []
        self.completed_jobs = []
        self.containers = []

    def add_job(self,container):
        self.containers.append(container)
        self.jobs.append(Job(**container.job_options))

    def submit_current_jobs(self):
        if self.jobs:
            args = []
            for i,job in enumerate(self.jobs):
                args.append([job.source,job.sql,i])
            self.completed_jobs = Pool().map(self.submit_to_sNowflake,args)
        else:
            with self.controller.output:
                print('Cannot submit the current jobs,none have been created!')

    def submit_to_sNowflake(self,job_source,job_sql,job_index):
        print(f"src={job_source},idx={job_index},sql len={len(job_sql)}")
        self.containers[job_index].df  = self.sf.run(self.controller.query_ui.output,job_sql) if job_source == 'query' else self.sf.run(self.controller.upload_ui.widgets['log'],job_sql)
        
class Job():

    def __init__(self,**options):
        self.sql = options['sql']
        self.source = 'query' if 'project' in options else 'upload'
        self.timestamp = datetime.datetime.Now().strftime("%Y%m%d-%H%M%s")
        if self.source == 'query':
            self.project = options['project']
            self.query = options['query']
            self.events = options['events']
        else:
            self.path = options['path']

以下是传递给 submit_to_sNowflake 函数的对象类型:

<class 'str'> <class 'str'> <class 'int'>

我已验证 args 是包含另一个长度为 3 的 listlist,指定 job_sourcejob_sqljob_index

调度器在用户界面类中实例化,获取错误代码序列为:

self.dispatcher = dispatcher.dispatcher(self)
self.dispatcher.add_job(self.container)   
self.dispatcher.submit_current_jobs()

错误是:

~\.conda\envs\nemawashi\lib\site-packages\dill\_dill.py in save_module_dict(pickler,obj)
    939             # we only care about session the first pass thru
    940             pickler._session = False
--> 941         StockPickler.save_dict(pickler,obj)
    942         log.info("# D2")
    943     return

~\.conda\envs\nemawashi\lib\pickle.py in save_dict(self,obj)
    969 
    970         self.memoize(obj)
--> 971         self._batch_setitems(obj.items())
    972 
    973     dispatch[dict] = save_dict

~\.conda\envs\nemawashi\lib\pickle.py in _batch_setitems(self,items)
    995                 for k,v in tmp:
    996                     save(k)
--> 997                     save(v)
    998                 write(SETITEMS)
    999             elif n:

~\.conda\envs\nemawashi\lib\pickle.py in save(self,obj,save_persistent_id)
    576                 reduce = getattr(obj,"__reduce_ex__",None)
    577                 if reduce is not None:
--> 578                     rv = reduce(self.proto)
    579                 else:
    580                     reduce = getattr(obj,"__reduce__",None)

TypeError: cannot pickle '_hashlib.HASH' object

这是因为我正在调用 self.sf.run 函数内的另一个函数 (self.submit_to_sNowflake) 吗?

谢谢

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。