微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

熊猫:参差不齐的时间序列上的时间加权滚动平均值

如何解决熊猫:参差不齐的时间序列上的时间加权滚动平均值

我有一个参差不齐的(意为非常规频率),按时间索引的DataFrame,我想对其进行时间加权滚动平均,以保持该DataFrame的原始索引。假设一个记录值在被另一个值取代之前是有效的。实现此目的的一种方法是将衣衫agged的DataFrame向上采样到一个统一的频率,然后进行滚动平均:

import pandas as pd
import numpy as np


def time_weighted_average_using_upsampling(df: pd.DataFrame,avg_window: str) -> pd.DataFrame:
    # Leads to high memory usage
    original_index = df.index.copy()
    avg = (
        df.resample("1s")
        .ffill()
        .rolling(avg_window,closed="left",min_periods=int(avg_window[0])))
        .mean()
        .reindex(original_index)
    )
    return avg


if __name__ == "__main__":
    df = pd.DataFrame(
        {"A": [0,1,2,3,4,5]},index=[
            pd.Timestamp("20130101 09:00:00"),pd.Timestamp("20130101 09:00:02"),pd.Timestamp("20130101 09:00:03"),pd.Timestamp("20130101 09:00:05"),pd.Timestamp("20130101 09:00:06"),pd.Timestamp("20130101 09:00:10"),],)

    expected_avg = pd.DataFrame(
        {"A": [np.nan,np.nan,1 / 3,5 / 3,7 / 3,4]},)

    pd.testing.assert_frame_equal(
        time_weighted_average_using_upsampling(df=df,avg_window="3s"),expected_avg
    )

与此相关的问题是,向上采样无法达到衣衫的df提供的稀疏表示的目的。稀疏表示可提高内存效率,而上采样版本则不能。这就引出了一个问题:如何在没有对整个df进行向上采样的情况下达到 以上所示的结果?

解决方法

这里是一种替代方法,您可以先检查两行之间的时间差大于间隙的位置,而不是对整个数据帧进行上采样。然后将这些带有间隔的行中的3删除,并将reindex df与这些特定的新时间戳的并集删除。一旦创建了这些行,就可以groupby使用添加新索引的位置,resample每组1s,最后rolling用您所使用的方法。 Reindex以df结尾。

rule = 3
rolling_win = f'{rule}s'

sparse = df.index.to_series().diff().dt.total_seconds().ge(rule)
new_timestamps = df.index[sparse] - pd.Timedelta(seconds=rule)
print(new_timestamps) 
#DatetimeIndex(['2013-01-01 09:00:07'],dtype='datetime64[ns]',freq=None)

#reindex with the new 
df_ = df.reindex(df.index.union(new_timestamps))

#perform first the resample 1s per group,then clean the dataframe to do the rolling.mean
#finally reindex like original df
df_ = (df_.groupby(df_.index.isin(new_timestamps).cumsum())
          .resample("1s").ffill()
          .reset_index(level=0,drop=True).ffill()
          .rolling(rolling_win,closed="left",min_periods=rule)\
          .mean()
          .reindex(df.index)
      )
print(df_)
                            A
2013-01-01 09:00:00       NaN
2013-01-01 09:00:02       NaN
2013-01-01 09:00:03  0.333333
2013-01-01 09:00:05  1.666667
2013-01-01 09:00:06  2.333333
2013-01-01 09:00:10  4.000000

在这种情况下,由于间隙实际上很小,所以并不是很有趣,但是如果间隙很大,那么它将变得有用。

EDIT或其他选择,可能更好,union由您删除1s,2s,3s ...(取决于规则)的原始索引构成的所有索引。现在,reindexffillrolling.mean仅具有滚动所需的索引。最后结果相同

from functools import reduce

rule = 3
rolling_win = f'{rule}s'

idx = df.index
df_ = (df.reindex(reduce(lambda x,y: x.union(y),[idx - pd.Timedelta(seconds=i) 
                          for i in range(0,rule+1)]))
         .ffill()
         .rolling(rolling_win,min_periods=rule)\
         .mean()
         .reindex(df.index)
        )
,

受@ Ben.T启发的两种可能的解决方案:

configure -release -opensource -confirm-license -opengl es2 -device linux-rasp-pi-g++ -device-option CROSS_COMPILE=/opt/cross-pi-gcc/bin/arm-linux-gnueabihf- -sysroot $HOME/rpi/rootfs -prefix /usr/local/qt-5.15.0-rpi -extprefix $HOME/rpi/qt-5.15.0-rpi -hostprefix $HOME/rpi/tools -make libs  -no-use-gold-linker -skip qtwayland -skip qtlocation -skip qtscript -v -no-gbm -ssl

第一个窗口一次对单个滚动窗口进行上采样,而后者实际上通过确保在我们关心的窗口的开头始终有一个可用点来进行时间加权平均。这是通过将原始索引随窗口长度偏移而完成的。

我尚未评估相关案件的表现。

编辑: 我决定在具有约100,000行并使用20分钟窗口的第二个分辨率数据集上测试功能(!)两种变体都非常缓慢,但我认为我有一个新的赢家:

def time_weighted_average_using_local_upsampling(df: pd.DataFrame,avg_window: str) -> pd.DataFrame:
    """Uses second resolution up-sampling only on smaller windows at a time."""
    original_index = df.index.copy()
    avg = (
        df.reindex(df.index.union(df.index.shift(periods=-1,freq=avg_window)),method="ffill")
        .rolling(avg_window,closed="both",min_periods=2)
        .apply(lambda x: x.resample("1s").ffill()[:-1].mean(skipna=False))
        .reindex(original_index)
    )
    return avg


def time_weighted_average_using_index_weighting(df: pd.DataFrame,avg_window: str) -> pd.DataFrame:
    """Uses weighting by duration,by ensuring every window has a point at the start."""
    original_index = df.index.copy()
    avg = (
        df.reindex(df.index.union(df.index.shift(periods=-1,min_periods=2)
        .apply(lambda x: np.average(x[:-1],weights=x.index.to_series().diff()[1:].dt.seconds))
        .reindex(original_index)
    )
    return avg

这是在滚动之前预先进行加权的方法,因此我们不用使用def time_weighted_average_using_index_weighting2(df: pd.DataFrame,by ensuring every window has a point at the start.""" original_index = df.index.copy() avg = df.reindex(df.index.union(df.index.shift(periods=-1,method="ffill") avg = ( avg.multiply(avg.index.to_series().diff().dt.seconds.shift(-1),axis=0) .divide(pd.Timedelta(avg_window).seconds) .rolling(avg_window,closed="left") .sum() .reindex(original_index) ) avg[~((avg.index - pd.Timedelta(avg_window)) >= original_index[0])] = np.nan return avg 而不是.sum()。这转化为极大的速度提高。无论平均窗口的大小如何,我们最多还可以避免将索引加倍。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。