如何解决使用 Dask和/或 xarray并行和延迟地进行短时傅立叶变换频谱分析
问题:
我正在尝试对长时间序列数据进行频谱分析(参见数据结构示例,它基本上是带有时间索引的一维数据)。为了节省时间和内存等。我想并行且懒惰地执行此操作(使用 xarray 和/或 dask)。
最好(或更好)的方法是什么?
我的尝试:
(示例和代码见下文)
-
将 scipy.signal.stft 与 xr.apply_ufunc 结合使用:
问题: ValueError,仅在输入数据为 1 个块时有效,不适用于大数据。 -
使用 scipy.signal.stft 和 dask.array.from_delayed:
问题: 输出数据始终是 1 块,这使得进一步处理数据变得困难。 (之后重新分配内存过载) -
使用 xr.Dataset.rolling.construct 的中间(懒惰)二维转换。这里 1 维是时间,行是我执行 fft(“滚动窗口”)的短时间窗口。
如果有数据:[1,2,3,4,5] 并且滚动窗口为 3,这将变成:
时间索引 滚动窗口 00:00:00 NaN、NaN、1 00:00:01 NaN,1,2 00:00:02 1、2、3 00:00:03 2、3、4 00:00:04 3、4、5 然后使用 xr.apply ufunc 计算这个新数组上每个时间点行(矢量化)的 fft,也是惰性的(具体请参见下面的示例)。 这很有效,而且因为它是惰性计算的,所以也适用于大数据集。
问题:
- 似乎比 scipy.signal.stft 慢(参见下面的基准测试)
- overlap/stepsize 不能改变(就像 scipy stft 中的 nooverlap)。
- 真的需要中间步骤还是有一种方法可以立即计算滚动窗口 fft?
- 在某些情况下仍会导致 RAM 过载
其他:我还尝试了其他方法,例如使用 numba 优化函数(但这不适用于 np.fft.fft),但为了保持这篇已经很长的帖子简短,我只包括我尝试过的最有前途的方法。
我该怎么做才能做到这一点,和/或提高方法 3 的性能或使用方法 1 或 2,或者使用 xarray / dask 进行其他延迟和并行处理?
谢谢!
代码/示例:
导入和测试数据集:
import xarray as xr
import dask.array as da
import dask
import numpy as np
from scipy import signal as ss
# variables
fs= 128 # rec frequency
_l = int(1e3) # data length
window = fs * 10 # window over which to do stft
# Create data
data = np.random.random(_l * fs)
data = xr.DataArray(da.from_array(data,name="x")).assign_coords({"dim_0":np.arange(_l * fs)})
data = data.chunk(chunks=_l/10 * fs) # create multiple chunks to simulate bigger data
data = data.to_dataset()
方法一:
def stft(data,):
f,t,Zxx = ss.stft(data,fs=fs,nperseg=window,noverlap= window - 1
)
return np.abs(Zxx)
data['fft'] = xr.apply_ufunc(stft,data.x,input_core_dims=[['dim_0']],output_core_dims=[["f","t"]],# define newly created output dims
dask="parallelized",output_dtypes=['f8'],output_sizes={"f": window / 2 + 1,"t": data.x.size - 1},)
输出:
ValueError: dimension 'dim_0' on 0th function argument to apply_ufunc with dask='parallelized' consists of multiple chunks,but is also a core dimension. To fix,rechunk into a single dask array chunk along this dimension,i.e.,``.chunk({'dim_0': -1})``,but beware that this may significantly increase memory usage.
方法二:
def stft1(x):
f,Zxx = ss.stft(x,noverlap=window - 1
)
return np.abs(Zxx)
da.from_delayed(dask.delayed(stft1)(data.x),shape=(window / 2 + 1,data.x.size - 1),Meta="f8")
输出:
方法 3:(有效但不是最佳的?)
def xr_make_fft(ds,par):
ds['roll_window'] = np.arange(window) # create dimension for rolling window
# FFT function to apply vectorized:
def xr_fft(x):
fft = np.fft.fft(x)[xr_fft.idx]
return np.abs(fft)
# make new parameter with rolling windows stacked
ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",)
# Calc FFT Freq domain
fftfreq = np.fft.fftfreq(window,1 / fs)
idx = (0 < fftfreq)
freq = fftfreq[idx]
ds['Frequency'] = freq
xr_fft.idx = idx
ds[f'FFT'] = xr.apply_ufunc(xr_fft,ds[f"FFT_window"],vectorize=True,input_core_dims=[["roll_window"]],# define input dim over which to vectorize (this dim in inserted completely)
output_core_dims=[["Frequency"]],# define newly created output dims
dask="parallelized",output_sizes={"Frequency": len(freq)},)
ds = ds.drop(f'FFT_window').drop_dims('roll_window')
return ds
data = xr_make_fft(data,"x")
输出:
在小数据集上的性能测试: 128e3 长度数据(最大的数据集,没有超载 RAM):
方法二:
%timeit da.from_delayed(dask.delayed(stft1)(data.x),Meta="f8").compute()
3.53 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
方法 3:
%timeit data.FFT.compute()
6.04 s ± 219 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
scipy.signal.stft
%timeit ss.stft(data.x,fs,noverlap=window-1)[2]
1.87 s ± 8.28 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
解决方法
好吧,我终于想出了一种方法让它在大型数据集上工作:
(下面的工作代码)
为了使方法 3 适用于大型数据集,请确保为使用 2d 转换创建的(大得多)块(本例中为 1.31 TB,数据长度为 128e7)调整块大小 ):
在我的系统(16GB RAM)上使用示例中的参数,24000 块是最佳的(245.76MB)。最佳块大小会随着不同的频率和窗口而变化。
工作代码(方法 3):
def xr_make_fft(ds,par):
ds['roll_window'] = np.arange(window) # create dimension for rolling window
# FFT function to apply vectorized:
def xr_fft(x):
fft = np.fft.fft(x)[xr_fft.idx]
return np.abs(fft)
# make new parameter with rolling windows stacked
ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",).chunk({'dim_0': 24000})
# Calc FFT Freq domain
fftfreq = np.fft.fftfreq(window,1 / fs)
idx = (0 < fftfreq)
freq = fftfreq[idx]
ds['Frequency'] = freq
xr_fft.idx = idx
ds[f'FFT'] = xr.apply_ufunc(xr_fft,ds[f"FFT_window"],vectorize=True,input_core_dims=[["roll_window"]],# define input dim over which to vectorize (this dim in inserted completely)
output_core_dims=[["Frequency"]],# define newly created output dims
dask="parallelized",output_dtypes=['f8'],output_sizes={"Frequency": len(freq)},)
ds = ds.drop(f'FFT_window').drop_dims('roll_window')
return ds
data = xr_make_fft(data,"x")
我觉得还有优化的空间,所以如果你有建议,请在这里发表:) 此外,我仍在寻找一种使 scipy sftf 工作的方法。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。