微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为数据数组中的每个网格提取时间序列数据

如何解决为数据数组中的每个网格提取时间序列数据

我正在处理具有时间、纬度和经度维度的数据数组。可以使用以下代码重现数据:

import numpy as np
from datetime import timedelta
import datetime
import xarray as xr

precipitation = 10 * np.random.rand(20,40,2880)
lon = range(20)
lat = range(40)
time = np.arange('2017-06-01','2017-07-31',timedelta(minutes=30),dtype='datetime64[ns]')
data =xr.DataArray(
                   data=precipitation,dims=["lon","lat","time"],coords=[lon,lat,time])    
print (data)

我愿意为数据数组中的每个网格提取时间序列数据并将其存储在一个 csv 文件中。

这是我目前尝试过的:

stacked_data = data.stack(z=("lon","lat"))
for index in stacked_data.indexes["z"]:
    data = stacked_data.sel(z=index,drop=True)
    data_df=data.to_dataframe(name =str(index))
    data_df.to_csv("time series"+str(index)+".txt")

方法适用于此处提供的示例。然而,在处理从时间序列光栅文件创建的大数据数组时需要很长时间(“时间”很多年,数千个“经度”和数千个“纬度”)。

有没有其他方法可以快速完成这项工作?

谢谢!

解决方法

如果您的工作流程很简单并且您在一台机器上处理,我建议使用 concurrent.futures 使用多个进程将数据转储到 csv:


from concurrent.futures import ProcessPoolExecutor

def dump_to_csv(x,ix):
    x = x.to_dataframe(name=str(ix))
    x.to_csv("temp/time series"+str(ix)+".txt")
    return True

with ProcessPoolExecutor(max_workers=4) as pool:
    res = [pool.submit(dump_to_csv,stacked_data.sel(z=ix,drop=True),ix) for ix in stacked_data.indexes["z"]]

当然,您可以根据您的要求/硬件调整并发进程的数量。您还可以实现一种更复杂的方法来检查文件是否已写入,就像在我的快速示例中一样,您会在 True 中返回 future

如果您的规模更大和/或涉及多台机器,您可以查看 dask.distributeddask.delayed。通过这种方式,您可以在工作人员之间传播数据,然后将其同时写入磁盘。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。