JSON 序列化对象在多处理调用时出错 - TypeError: XXX objects not callable error

如何解决JSON 序列化对象在多处理调用时出错 - TypeError: XXX objects not callable error

我正在使用 JSON 序列化器辅助函数来轻松访问字典(基本上以 JSON 形式接收)对象。

jsondict.py

"""Utilities for working with JSON and json-like structures - deeply nested Python dicts and lists

This lets us iterate over child nodes and access elements with a dot-notation.
"""
import sys
isPy3 = sys.version_info[0]==3
if isPy3:
    def __alt_str__(v,enc='utf8'):
        return v if isinstance(v,bytes) else v.encode(enc)
    __strTypes__ = (str,bytes)
else:
    __alt_str__ = unicode
    __strTypes__ = (str,unicode)

class MyLocals(object):
    pass
mylocals = MyLocals()

def setErrorCollect(collect):
    mylocals.error_collect = collect

setErrorCollect(False)

def errorValue(x):
    if isinstance(x,__strTypes__):
         return repr(x) if ' ' in x else x
    return 'None' if x is None else str(x)
def condJSON(v,__name__=''):
    return JSONDict(v,__name__=__name__) if isinstance(v,dict) else JSONList(v,list) else v

def condJSONSafe(v,__name__=''):
    return JSONDictSafe(v,dict) else JSONListSafe(v,list) else v

class JSONListIter(object):
    def __init__(self,lst,conv):
        self.lst = lst
        self.i = -1
        self.conv = conv

    def __iter__(self):
        return self

    def next(self):
        if self.i<len(self.lst)-1:
            self.i += 1         
            return self.conv(self.lst[self.i])
        else:
            raise StopIteration

    if isPy3:
        __next__ = next
        del next

class JSONList(list):
    def __init__(self,v,__name__=''):
        list.__init__(self,v)
        self.__name__ = __name__
    def __getitem__(self,x):
        return condJSON(list.__getitem__(self,x),__name__='%s\t%s'%(self.__name__,errorValue(x)))
    def __iter__(self):
        return JSONListIter(self,condJSON)

class JSONListSafe(JSONList):
    def __getitem__(self,x):
        __name__='%s\t%s'%(self.__name__,errorValue(x))
        try:
            return condJSONSafe(list.__getitem__(self,__name__=__name__)
        except:
            if mylocals.error_collect:
                mylocals.error_collect(__name__)
            return JSONStrSafe('')
    def __iter__(self):
        return JSONListIter(self,condJSONSafe)

class JSONStrSafe(str):
    def __getattr__(self,attr):
        return self
    __getitem__ = __getattr__


class JSONDict(dict):
    "Allows dotted access"
    def __new__(cls,*args,**kwds):
        __name__ = kwds.pop('__name__')
        self = dict.__new__(cls,**kwds)
        self.__name__ = __name__
        return self

    def __init__(self,**kwds):
        kwds.pop('__name__','')
        dict.__init__(self,**kwds)

    def __getattr__(self,attr,default=None):
        if attr in self:
            return condJSON(self[attr],errorValue(attr)))
        elif __alt_str__(attr) in self:
            return condJSON(self[__alt_str__(attr)],errorValue(attr)))
        elif attr=='__safe__':
            return JSONDictSafe(self,__name__=self.__name__)
        else:
            raise AttributeError("No attribute or key named '%s'" % attr)

    def sorted_items(self,accept=None,reject=lambda i: i[0]=='__name__'):
        if accept or reject:
            if not accept:
                f = lambda i: not reject(i)
            elif not reject:
                f = accept
            else: #both
                f = lambda i: accept(i) and not reject(i)
            return sorted(((k,condJSON(v,__name__==k)) for k,v in self.iteritems() if f((k,v))))
        else:
            return sorted(((k,v in self.iteritems()))

    def sorted_keys(self):
        return sorted(self.keys())

class JSONDictSafe(JSONDict):
    "Allows dotted access"
    def __getattr__(self,default=None):
        if attr in self:
            return condJSONSafe(self[attr],errorValue(attr)))
        elif __alt_str__(attr) in self:
            return condJSONSafe(self[__alt_str__(attr)],errorValue(attr)))
        elif attr=='__safe__':
            return self
        else:
            return JSONStrSafe('')

    def __getitem__(self,errorValue(x))
        try:
            return condJSONSafe(dict.__getitem__(self,__name__=__name__)
        except KeyError:
            if mylocals.error_collect:
                mylocals.error_collect(__name__)
            return JSONStrSafe('')

    def sorted_items(self,condJSONSafe(v,v in self.iteritems()))

如果 JSON 对象如下所示。

data = {'name': 'john','age': 20,'address': {'city':'xyz','country':'XZ','zip': 1223}}

json_obj = condJSONSafe(data)

我可以使用点表示法访问数据。

print(json_obj.name) --> john
print(json_obj.address.country) --> XZ

在我在代码中实现多处理以提高性能之前,它运行良好。

我从 JSON 中提取了一定数量的数据(在使用上述辅助函数将其作为点表示法可访问的数据之后)并将其存储到单独的列表中,例如列表 a、b、c。

然后,我传入了多处理线程,

with mp.Pool(processes=mp.cpu_count()) as pool:
    res = pool.starmap(self.process_records,zip(self.a,self.b,self.c))
pool.join()

结束

TypeError: 'JSONStrSafe' object is not callable

我尝试了 this 答案,但对我不起作用。感谢您的帮助。提前致谢。

编辑: 重现示例:

test.py

import jsondict
import multiprocessing as mp
import itertools

def process_records(data,metadata):
    print(data.name)
    print(metadata)
    #code to requirment


if __name__ == '__main__':
    data = {
        "metadata": "test_data","cust_list": [
            {
                'name': 'john','address': {
                    'city':'xyz','zip': 1223
                }
            },{
                'name': 'michal','age': 25,'address': {
                    'city':'abc','country':'CX','zip': 3435
                }
            },{
                'name': 'david','age': 30,'address': {
                    'city':'mnl','country':'TD','zip': 6767
                }
            }
        ]
    }

    json_obj = jsondict.condJSONSafe(data)

    print(json_obj.metadata) #will print 'test_data'
    print(json_obj.cust_list[0].name) #will print 'john'
    print(json_obj.cust_list[2].address.city) #will print 'mnl'


    with mp.Pool(processes=mp.cpu_count()) as pool:
        res = pool.starmap(process_records,zip(json_obj.cust_list,itertools.repeat(json_obj.metadata))) # --> not working
        #res = pool.map(process_records,itertools.repeat(json_obj.metadata))) --> not working
        #res = [pool.apply_async(process_records,d,json_obj.metadata) for d in json_obj.cust_list] --> not working
        #apply --> not working
    pool.join()

输出:

test_data
john
mnl
Traceback (most recent call last):
  File "c:/Users/mohanlal/Desktop/Mock/json_err/test_app.py",line 53,in <module>
    res = pool.starmap(process_records,itertools.repeat(json_obj.metadata))) # --> not working
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py",line 268,in starmap
    return self._map_async(func,iterable,starmapstar,chunksize).get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py",line 608,in get
    raise self._value
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py",line 385,in _handle_tasks
    put(task)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\connection.py",line 206,in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\reduction.py",line 51,in dumps
    cls(buf,protocol).dump(obj)
TypeError: 'JSONStrSafe' object is not callable

尝试使用 startmap、map、apply_async、apply,得到相同的错误。

我已经尝试使用上面附加链接的类似问题中给出的解决方案。在出现此错误的地方修改如下。

import re
dunder_pattern = re.compile("__.*__")
protected_pattern = re.compile("_.*")

classJSONStrSafe(str):
    def__getattr__(self,attr):
        if dunder_pattern.match(attr) or protected_pattern.match(attr):
            return super().__getattr__(attr)
        return self
def__getstate__(self): returnself.__dict__
def__setstate__(self,d): self.__dict__.update(d)

__getitem__ = __getattr__

但问题仍然存在。

正如评论中所建议的,我在 getattr 的所有 3 个地方都进行了更改并尝试了。得到不同的错误如下

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py",line 249,in _bootstrap
    self.run()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py",line 93,in run
    self._target(*self._args,**self._kwargs)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py",line 108,in worker
    task = get()
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\queues.py",line 345,in get
    return _ForkingPickler.loads(res)
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py",line 89,in __new__
    __name__ = kwds.pop('__name__')
Process SpawnPoolWorker-2:
Process SpawnPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
KeyError: '__name__'
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py",in get
    return _ForkingPickler.loads(res)
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py",in _bootstrap
    self.run()
  File "c:\Users\mohanlal\Desktop\Mock\json_err\jsondict.py",in __new__
    __name__ = kwds.pop('__name__')
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\process.py",**self._kwargs)
KeyError: '__name__'
  File "C:\Users\mohanlal\AppData\Local\Programs\Python\Python36\lib\multiprocessing\pool.py",in __new__
    __name__ = kwds.pop('__name__')
KeyError: '__name__'

解决方法

问题是您处于“泡菜”中。原谅双关语——你有一个pickle 问题。当您进行多处理时,您的工作函数/方法的参数被pickled。通常,用于序列化和反序列化状态的默认值是可以的,但在您的情况下则不然。见404。序列化和反序列化对象的默认保存和加载操作是:

def save(obj):
    return (obj.__class__,obj.__dict__)

def load(cls,attributes):
    obj = cls.__new__(cls)
    obj.__dict__.update(attributes)
    return obj

请注意,在反序列化对象时,不会调用对象的 __init__ 方法,而是调用其 __new__ 方法,这就是问题所在。我不得不修改类 __new__JSONDict 方法以尝试识别它是通过反序列化调用的,因此 '__name__' 可能不存在于关键字参数中,然后不得不向该类添加自定义 __getstate____setstate__ 方法以覆盖它保存和恢复对象属性的默认方式(方法 __init__ 保持不变):

class JSONDict(dict):
    "Allows dotted access"
    def __new__(cls,*args,**kwds):
        self = dict.__new__(cls,**kwds)
        if kwds and '__name__' in kwds:
            __name__ = kwds.pop('__name__')
            self.__name__ = __name__
        return self

    def __init__(self,**kwds):
        kwds.pop('__name__','')
        dict.__init__(self,**kwds)

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self,d):
        self.__dict__ = d


    """ The other methods remain unmodified """

打印:

test_data
john
mnl
john
test_data
michal
david
test_data
test_data

更新

我一直在摸索为什么有必要提供 __getstate____setstate__ pickle 方法,因为无论如何,它们所做的应该是默认操作。如果您修改程序只是为了测试酸洗,甚至没有通过插入以下行运行 Pool 方法:

json_obj = condJSONSafe(data)
# insert this line:
import pickle; print(pickle.dumps(json_obj)); sys.exit(0)

它打印:

Traceback (most recent call last):
  File "test.py",line 205,in <module>
    import pickle;  print('pickle'); print(pickle.dumps(json_obj)); sys.exit(0)
TypeError: 'JSONStrSafe' object is not callable

在正确的位置添加打印语句后,问题很明显出在类 __getattr__JSONDictSafe 方法中。当 pickle 检查类是否实现了方法 __getstate____setstate__ 时,当没有实现时 __getattr__ 最终被调用并作为这些属性的默认值返回一个 { {1}} 个实例。因此,与其像我那样通过定义这些方法来提供这些属性,不如添加一个简单的检查,如下所示:

JSONStrSafe

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res