如何解决类型错误:无法pickle '_thread.lock' 对象 Dask 计算
我正在尝试使用 dask 进行多处理。我有一个必须运行 10000 个文件的函数,并将生成文件作为输出。函数将 S3 存储桶中的文件作为输入,并使用 S3 内部具有相似日期和时间的另一个文件。我在 JupyterLab 中做所有事情
这是我的功能:
def get_temp(file,name):
d=[name[0:4],name[4:6],name[6:8],name[9:11],name[11:13]]
f_zip = gzip.decompress(file)
yr=d[0]
mo=d[1]
da=d[2]
hr=d[3]
mn=d[4]
fs = s3fs.S3FileSystem(anon=True)
period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da),freq='D')
# period.dayofyear
dy=period.dayofyear
cc=[7,8,9,10,11,12,13,14,15,16] #look at the IR channels only for Now
dat = xr.open_dataset(f_zip)
dd=dat[['recNum','trackLat','trackLon','temp']]
dd=dd.to_dataframe()
dd = dd.dropna()
dd['num'] = np.arange(len(dd))
l=dd.where((dd.trackLat>-50.0) & (dd.trackLat<50.0) & (dd.trackLon>-110.0) & (dd.trackLon<10.0))
l = l.dropna()
l.reset_index()
dy="{0:0=3d}".format(dy)
#opening goes data from S3
F=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C07'+'*')[int(mn)//15]))
#Converting Lat lon to radiance
req=F['goes_imager_projection'].semi_major_axis
oneovf=F['goes_imager_projection'].inverse_flattening
rpol=F['goes_imager_projection'].semi_minor_axis
e = 0.0818191910435
sat_h=F['goes_imager_projection'].perspective_point_height
H=req+sat_h
gc=np.deg2rad(F['goes_imager_projection'].longitude_of_projection_origin)
phi=np.deg2rad(l.trackLat.values)
gam=np.deg2rad(l.trackLon.values)
phic=np.arctan((rpol**2/req**2)*np.tan(phi))
rc=rpol/np.sqrt((1-e**2*np.cos(phic)**2))
sx=H-rc*np.cos(phic)*np.cos(gam-gc)
sy=-rc*np.cos(phic)*np.sin(gam-gc)
sz=rc*np.sin(phic)
yy=np.arctan(sz/sx)
xx=np.arcsin(-sy/(np.sqrt(sx**2+sy**2+sz**2)))
for i in range(len(xx)):
for c in range(len(ch):
ch="{0:0=2d}".format(cc[c])
F1=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str(hr)+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[0]))
F2=xr.open_dataset(fs.open(fs.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))
G1 = F1.where((F1.x >= (xx[i]-0.005)) & (F1.x <= (xx[i]+0.005)) & (F1.y >= (yy[i]-0.005)) & (F1.y <= (yy[i]+0.005)),drop=True)
G2 = F2.where((F2.x >= (xx[i]-0.005)) & (F2.x <= (xx[i]+0.005)) & (F2.y >= (yy[i]-0.005)) & (F2.y <= (yy[i]+0.005)),drop=True)
G = xr.concat([G1,G2],dim = 'time')
G = G.assign_coords(channel=(ch))
if c == 0:
T = G
else:
T = xr.concat([T,G],dim = 'channel')
T = T.assign_coords(temp=(str(l['temp'][i])))
print(l.iloc[i]['num'])
path = name+'_'+str(int(l.iloc[i]['num']))+'.nc'
T.to_netcdf(path)
#zipping the file
with zipfile.ZipFile(name+'_'+str(int(l.iloc[i]['num']))+'.zip','w',compression=zipfile.ZIP_DEFLATED) as zf:
zf.write(path,arcname=str(name+'_'+str(int(l.iloc[i]['num']))+'.nc'))
# Storing it to S3
s3.Bucket(BUCKET).upload_file(path[:-3]+'.zip',"Output/" + path[:-3]+'.zip')
这是我从 S3 调用数据:
s3 = boto3.resource('s3')
s3client = boto3.client(
's3',region_name='us-east-1'
)
bucketname = s3.Bucket('temp')
filedata = []
keys = []
names = []
for my_bucket_object in bucketname.objects.all():
keys.append(my_bucket_object.key)
for i in range(1,21):
fileobj = s3client.get_object(
Bucket='temp',Key=(keys[i]))
filedata.append(fileobj['Body'].read())
names.append(keys[i][10:-3])
最初,我只是尝试运行 20 个文件以进行测试。
temp_files = []
for i in range(20):
s3_ds = dask.delayed(get_temp)(filedata[i],names[i])
temp_files.append(s3_ds)
temp_files = dask.compute(*temp_files)
这里是完整的错误日志:
distributed.protocol.pickle - INFO - Failed to serialize <function get_temp at 0x7f20a9cb8550>. Exception: cannot pickle '_thread.lock' object
---------------------------------------------------------------------------
KeyError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
3319 with _cache_lock:
-> 3320 result = cache_dumps[func]
3321 except KeyError:
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py in __getitem__(self,key)
1572 def __getitem__(self,key):
-> 1573 value = super().__getitem__(key)
1574 self.data.move_to_end(key)
/srv/conda/envs/notebook/lib/python3.8/collections/__init__.py in __getitem__(self,key)
1009 return self.__class__.__missing__(self,key)
-> 1010 raise KeyError(key)
1011 def __setitem__(self,key,item): self.data[key] = item
KeyError: <function get_temp at 0x7f20a9cb8550>
During handling of the above exception,another exception occurred:
TypeError Traceback (most recent call last)
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x,buffer_callback,protocol)
52 buffers.clear()
---> 53 result = cloudpickle.dumps(x,**dump_kwargs)
54 elif not _always_use_pickle_for(x) and b"__main__" in result:
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj,protocol,buffer_callback)
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self,obj)
562 try:
--> 563 return Pickler.dump(self,obj)
564 except RuntimeError as e:
TypeError: cannot pickle '_thread.lock' object
During handling of the above exception,another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-77-fa46004f5919> in <module>
----> 1 temp_files = dask.compute(*temp_files)
/srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py in compute(*args,**kwargs)
450 postcomputes.append(x.__dask_postcompute__())
451
--> 452 results = schedule(dsk,keys,**kwargs)
453 return repack([f(r,*a) for r,(f,a) in zip(results,postcomputes)])
454
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in get(self,dsk,restrictions,loose_restrictions,resources,sync,asynchronous,direct,retries,priority,fifo_timeout,actors,**kwargs)
2703 Client.compute: Compute asynchronous collections
2704 """
-> 2705 futures = self._graph_to_futures(
2706 dsk,2707 keys=set(flatten([keys])),/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self,user_priority,actors)
2639 {
2640 "op": "update-graph",-> 2641 "tasks": valmap(dumps_task,dsk),2642 "dependencies": dependencies,2643 "keys": list(map(tokey,keys)),/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/srv/conda/envs/notebook/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
3356 return d
3357 elif not any(map(_maybe_complex,task[1:])):
-> 3358 return {"function": dumps_function(task[0]),"args": warn_dumps(task[1:])}
3359 return to_serialize(task)
3360
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py in dumps_function(func)
3320 result = cache_dumps[func]
3321 except KeyError:
-> 3322 result = pickle.dumps(func,protocol=4)
3323 if len(result) < 100000:
3324 with _cache_lock:
/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x,protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x,**dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s",x,e)
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj,buffer_callback)
71 file,protocol=protocol,buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
75
/srv/conda/envs/notebook/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self,obj)
561 def dump(self,obj):
562 try:
--> 563 return Pickler.dump(self,obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
TypeError: cannot pickle '_thread.lock' object
有人可以在这里帮助我并告诉我我做错了什么。除了dask,还有其他的并行处理方式吗?
所以我只有在将文件上传到 S3 存储桶时才发现它会抛出该错误,否则它可以正常工作。但是如果我不在 S3 中保存文件,我将无法弄清楚文件的存储位置。当我在 dask 上运行时,它会将文件保存在我无法找到的地方。我在 Jupyterlab 中运行我的代码,但在任何目录中都没有保存任何内容。
解决方法
我花了一些时间来解析您的代码。
在大型函数中,您使用 s3fs
与您的云存储进行交互,这与 xarray 配合良好。
但是,在您的主代码中,您使用 boto3
列出和打开 S3 文件。这些文件保留对维护连接池的客户端对象的引用。那是不能腌制的东西。
s3fs
旨在与 Dask 一起使用,并确保文件系统实例和 OpenFile 对象的可选择性。由于您已经在某一部分使用了它,我建议您始终使用 s3fs
(但我当然有偏见,因为我是主要作者)。
或者,您可以只传递文件名(作为字符串),并且在工作函数内之前不要打开任何东西。这将是“最佳实践” - 您应该在工作任务中加载数据,而不是在客户端加载并传递数据。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。