微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Dask和/或 xarray并行和延迟地进行短时傅立叶变换频谱分析

如何解决使用 Dask和/或 xarray并行和延迟地进行短时傅立叶变换频谱分析

问题:

我正在尝试对长时间序列数据进行频谱分析(参见数据结构示例,它基本上是带有时间索引的一维数据)。为了节省时间和内存等。我想并行且懒惰地执行此操作(使用 xarray 和/或 dask)。
最好(或更好)的方法是什么?

我的尝试:

(示例和代码见下文)

  1. 将 scipy.signal.stft 与 xr.apply_ufunc 结合使用:
    问题: ValueError,仅在输入数据为 1 个块时有效,不适用于大数据。

  2. 使用 scipy.signal.stft 和 dask.array.from_delayed:
    问题: 输出数据始终是 1 块,这使得进一步处理数据变得困难。 (之后重新分配内存过载)

  3. 使用 xr.Dataset.rolling.construct 的中间(懒惰)二维转换。这里 1 维是时间,行是我执行 fft(“滚动窗口”)的短时间窗口。

    如果有数据:[1,2,3,4,5] 并且滚动窗口为 3,这将变成:

    时间索引 滚动窗口
    00:00:00 NaN、NaN、1
    00:00:01 NaN,1,2
    00:00:02 1、2、3
    00:00:03 2、3、4
    00:00:04 3、4、5

    然后使用 xr.apply ufunc 计算这个新数组上每个时间点行(矢量化)的 fft,也是惰性的(具体请参见下面的示例)。 这很有效,而且因为它是惰性计算的,所以也适用于大数据集。

    问题

    • 似乎比 scipy.signal.stft 慢(参见下面的基准测试)
    • overlap/stepsize 不能改变(就像 scipy stft 中的 nooverlap)。
    • 真的需要中间步骤还是有一种方法可以立即计算滚动窗口 fft?
    • 在某些情况下仍会导致 RAM 过载

其他:我还尝试了其他方法,例如使用 numba 优化函数(但这不适用于 np.fft.fft),但为了保持这篇已经很长的帖子简短,我只包括我尝试过的最有前途的方法

我该怎么做才能做到这一点,和/或提高方法 3 的性能或使用方法 1 或 2,或者使用 xarray / dask 进行其他延迟和并行处理?

谢谢!

代码/示例:

导入和测试数据集:

import xarray as xr
import dask.array as da
import dask
import numpy as np
from scipy import signal as ss

# variables
fs= 128           # rec frequency
_l = int(1e3)     # data length
window = fs * 10  # window over which to do stft


# Create data 
data = np.random.random(_l * fs)
data = xr.DataArray(da.from_array(data,name="x")).assign_coords({"dim_0":np.arange(_l * fs)})
data = data.chunk(chunks=_l/10 * fs)   # create multiple chunks to simulate bigger data
data = data.to_dataset()

方法一:

def stft(data,):
    f,t,Zxx = ss.stft(data,fs=fs,nperseg=window,noverlap= window - 1
                        )    
    return np.abs(Zxx)

data['fft'] = xr.apply_ufunc(stft,data.x,input_core_dims=[['dim_0']],output_core_dims=[["f","t"]],# define newly created output dims
              dask="parallelized",output_dtypes=['f8'],output_sizes={"f": window / 2 + 1,"t": data.x.size - 1},)

输出

ValueError: dimension 'dim_0' on 0th function argument to apply_ufunc with dask='parallelized' consists of multiple chunks,but is also a core dimension. To fix,rechunk into a single dask array chunk along this dimension,i.e.,``.chunk({'dim_0': -1})``,but beware that this may significantly increase memory usage.

方法二:

def stft1(x):
    f,Zxx = ss.stft(x,noverlap=window - 1
                        )    
    return np.abs(Zxx)
    
da.from_delayed(dask.delayed(stft1)(data.x),shape=(window / 2 + 1,data.x.size - 1),Meta="f8")

输出

Method 2 output

方法 3:(有效但不是最佳的?)

def xr_make_fft(ds,par):
    ds['roll_window'] = np.arange(window)  # create dimension for rolling window
    
    # FFT function to apply vectorized:
    def xr_fft(x):
        fft = np.fft.fft(x)[xr_fft.idx]
        return np.abs(fft)
    
    # make new parameter with rolling windows stacked
    ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",)
    
    # Calc FFT Freq domain
    fftfreq = np.fft.fftfreq(window,1 / fs)
    idx = (0 < fftfreq)
    freq = fftfreq[idx]
    ds['Frequency'] = freq
    xr_fft.idx = idx
    
    ds[f'FFT'] = xr.apply_ufunc(xr_fft,ds[f"FFT_window"],vectorize=True,input_core_dims=[["roll_window"]],# define input dim over which to vectorize (this dim in inserted completely)
                                output_core_dims=[["Frequency"]],# define newly created output dims
                                dask="parallelized",output_sizes={"Frequency": len(freq)},)
    
    ds = ds.drop(f'FFT_window').drop_dims('roll_window')
    return ds

data = xr_make_fft(data,"x")

输出

method 3 output

在小数据集上的性能测试: 128e3 长度数据(最大的数据集,没有超载 RAM):

方法二:

%timeit da.from_delayed(dask.delayed(stft1)(data.x),Meta="f8").compute()
3.53 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

方法 3:

%timeit data.FFT.compute() 
6.04 s ± 219 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

scipy.signal.stft

%timeit ss.stft(data.x,fs,noverlap=window-1)[2]
1.87 s ± 8.28 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

解决方法

好吧,我终于想出了一种方法让它在大型数据集上工作:
(下面的工作代码)

为了使方法 3 适用于大型数据集,请确保为使用 2d 转换创建的(大得多)块(本例中为 1.31 TB,数据长度为 128e7)调整块大小 ):

enter image description here

在我的系统(16GB RAM)上使用示例中的参数,24000 块是最佳的(245.76MB)。最佳块大小会随着不同的频率和窗口而变化。

enter image description here

工作代码(方法 3):

def xr_make_fft(ds,par):
    ds['roll_window'] = np.arange(window)  # create dimension for rolling window
    
    # FFT function to apply vectorized:
    def xr_fft(x):
        fft = np.fft.fft(x)[xr_fft.idx]
        return np.abs(fft)
    
    # make new parameter with rolling windows stacked
    ds['FFT_window'] = ds[par].rolling(dim_0=window).construct("roll_window",).chunk({'dim_0': 24000})
    
    # Calc FFT Freq domain
    fftfreq = np.fft.fftfreq(window,1 / fs)
    idx = (0 < fftfreq)
    freq = fftfreq[idx]
    ds['Frequency'] = freq
    xr_fft.idx = idx
    
    ds[f'FFT'] = xr.apply_ufunc(xr_fft,ds[f"FFT_window"],vectorize=True,input_core_dims=[["roll_window"]],# define input dim over which to vectorize (this dim in inserted completely)
                                output_core_dims=[["Frequency"]],# define newly created output dims
                                dask="parallelized",output_dtypes=['f8'],output_sizes={"Frequency": len(freq)},)
    
    ds = ds.drop(f'FFT_window').drop_dims('roll_window')
    return ds

data = xr_make_fft(data,"x")

我觉得还有优化的空间,所以如果你有建议,请在这里发表:) 此外,我仍在寻找一种使 scipy sftf 工作的方法。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。