如何解决在另一个成对的bin数组中获取数据数组最小值的最快方法
我有三个一维数组:
-
idxs
:索引数据 -
weights
:idxs
中各个指标的权重 -
bins
:用于计算其中最小权重的 bin。
这是我当前使用 idxs
检查名为 weights
的数据在哪个 bin 中的方法,然后计算分箱权重的最小值/最大值:
numpy 方法
import random
import numpy as np
# create example data
out_size = int(10)
bins = np.arange(3,out_size-3)
idxs = np.arange(0,out_size)
#random.shuffle(idxs)
# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]
weights = idxs
# get which bin idxs belong to
slices = np.digitize(idxs,bins)
# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]
# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]
# get index of each first unque slices
unique_slices,unique_index = np.unique(valid_slices_sort,return_index=True)
# calculate the minimum
res_sub = np.minimum.reduceat(valid_weights_sort,unique_index)
# save results
res = np.full((out_size),np.nan)
res[unique_slices-1] = res_sub
print(res)
结果:
array([ 3.,nan,5.,nan])
如果我将 out_size
增加到 1e7 并shuffle 数据,速度(从 np.digitize 到末尾)很慢:
13.5 s ± 136 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
而且,这是每个部分的消耗时间:
np.digitize: 10.8 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
valid: 171 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs,10 loops each)
argsort and slice: 2.02 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
unique: 9.9 ms ± 113 µs per loop (mean ± std. dev. of 7 runs,100 loops each)
np.minimum.reduceat: 5.11 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs,100 loops each)
np.digitize()
花费的时间最多:10.8 秒。并且,接下来是 argsort
: 2.02 s。
我还检查了使用 mean
计算 np.histogram
的消耗时间:
counts,_ = np.histogram(idxs,bins=out_size,range=(0,out_size))
sums,out_size),weights = weights,density=False)
mean = sums / np.where(counts == 0,np.nan,counts)
33.2 s ± 3.47 s per loop (mean ± std. dev. of 7 runs,1 loop each)
这类似于我计算最小值的方法。
scipy 方法
from scipy.stats import binned_statistic
statistics,_,_ = binned_statistic(idxs,weights,statistic='min',bins=bins)
print(statistics)
结果有点不同,但是对于较长的(1e7)混洗数据,速度要慢得多(x6):
array([ 3.,5.])
1min 20s ± 6.93 s per loop (mean ± std. dev. of 7 runs,1 loop each)
总结
我想找出一个更快的方法。如果该方法也适用于 dask,那就太好了!
用户案例
解决方法
实现此目的的快速方法是使用 dask.dataframe
和 pd.cut
,我首先展示 pandas
版本:
import numpy as np
from scipy.stats import binned_statistic as bs
import pandas as pd
nrows=10**7
df = pd.DataFrame(np.random.rand(nrows,2),columns=['x','val'])
bins = np.linspace(df['x'].min(),df['x'].max(),10)
df['binned_x'] = pd.cut(df['x'],bins=bins,right=False)
result_pandas = df.groupby('binned_x')['val'].min().values
result_scipy = bs(df['x'],df['val'],'min',bins=bins)[0]
print(np.isclose(result_pandas,result_scipy))
# [ True True True True True True True True True]
现在要从 pandas
转到 dask
,您需要确保 bin 跨分区一致,因此请查看 here。一旦每个分区都被一致地装箱,你想要应用所需的操作(最小/最大/总和/计数):
import dask.dataframe as dd
ddf = dd.from_pandas(df,npartitions=10)
def f(df,bins):
df = df.copy()
df['binned_x'] = pd.cut(df['x'],right=False)
result = df.groupby('binned_x',as_index=False)['val'].min()
return result
result_dask = ddf.map_partitions(f,bins).groupby('binned_x')['val'].min().compute()
print(np.isclose(result_pandas,result_dask))
# [ True True True True True True True True True]
在我的笔记本电脑上,第一个代码大约需要 7 3 秒,第二个代码大约快 10 倍(忘了我在重复计算 pandas 和 scipy执行相同的操作)。分区有一定的余地,但这取决于上下文,因此您可以尝试优化数据/硬件。
更新:请注意,此方法适用于最小值/最大值,但对于平均值,您需要计算总和和计数,然后将它们相除。在一次性执行此计算时可能有一种跟踪权重的好方法,但可能不值得增加代码复杂性。
,SultanOrazbayev 展示了一个快速的方法;我会添加一个很酷的。
mask = bins[:,None] == idxs[None,:]
result = np.nanmin(np.where(mask,weights,np.nan),axis=-1)
# Note: may produce (expected) runtime warning if bin has no values
当然,你也可以做np.nanmax
、np.nanmean
等
以上假设您的 bin 确实是单个值。如果它们是范围,则需要稍微多做一些工作来构建掩码
lower_mask = idxs[None,:] >= bins[:,None]
upper_mask = np.empty_like(lower_mask)
upper_mask[:-1,...] = idxs[None,:] < bins[1:,None]
upper_mask[-1,...] = False
mask = lower_mask & upper_mask
此时您可以像上面一样使用 np.nanmin
。
Ofc np.where
和创建掩码的广播将创建具有各自数据类型的新形状数组 (len(bins),len(idxs))。如果您对此不关心,那么上述内容可能就足够了。
如果这是一个问题(因为您需要 RAM),那么我的第一个建议是购买更多 RAM。如果 - 由于某些愚蠢的原因(例如,钱) - 这不起作用,您可以通过在手动重新跨步到 weights
weights
的副本>
import numpy.ma as ma
mask = ...
restrided_weights = np.lib.stride_tricks.as_strided(weights,shape=(bins.size,idxs.size),strides=(0,idxs.strides[0]))
masked = ma.masked_array(restrided_weights,mask=~mask,fill_value=np.nan,dtype=np.float64)
result = masked.min(axis=-1).filled(np.nan)
这避免了 weights
的副本和上述运行时警告。
如果您甚至没有足够的内存来构造 mask
,那么您可以尝试分块处理数据。
最后我检查过,Dask 过去在使用手动跨距数组喂食时有一些有趣的行为。不过,这方面有一些工作,因此您可能需要仔细检查是否已解决,在这种情况下,您可以愉快地将上述内容并行化。
更新基于您对此答案和其他答案的进一步评论:
您可以分块进行此计算,以避免由于大量 bin(数量级为 1e4)而导致的内存问题。将具体数字放入完整示例中并添加进度条表示单核运行时间
import numpy.ma as ma
from tqdm import trange
import numpy as np
import random
# create example data
out_size = int(1.5e5)
#bins = np.arange(3,out_size-3)
bins = np.arange(3,int(3.8e4-3),dtype=np.int64)
idxs = np.arange(0,out_size)
random.shuffle(idxs)
# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]
weights = idxs
chunk_size = 100
# preallocate buffers to avoid array creation in main loop
extended_bins = np.empty(len(bins) + 1,dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
mask_buffer = np.empty((chunk_size,len(idxs)),dtype=bool)
result = np.empty_like(bins,dtype=np.float64)
for low in trange(0,len(bins),chunk_size):
high = min(low + chunk_size,len(bins))
chunk_size = high - low
mask_buffer[:chunk_size,...] = ~((bins[low:high,None] <= idxs[None,:]) & (extended_bins[low+1:high+1,None] > idxs[None,:]))
mask = mask_buffer[:chunk_size,...]
restrided_weights = np.lib.stride_tricks.as_strided(weights,shape=mask.shape,idxs.strides[0]))
masked = ma.masked_array(restrided_weights,mask=mask,dtype=np.float64)
result[low:high] = masked.min(axis=-1).filled(np.nan)
奖励:对于 min
和 max
,您可以使用一个很酷的技巧:排序 idxs
和 { {1}} 基于 weights
(最小值升序,最大值降序)。这样,您可以立即查找最小值/最大值,并且可以完全避免屏蔽数组和自定义步幅。这依赖于 weights
的一些没有很好记录的行为,它对布尔数组进行快速传递并且不搜索完整数组。
它仅适用于这两种情况,对于更复杂的事情(平均值),您必须回到上述情况,但对于这两种情况,它又减少了约 70% 并在单核时钟上运行
np.argmax
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。