微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用矢量化在 python 中加速这个 DP 函数

如何解决如何使用矢量化在 python 中加速这个 DP 函数

所以我在这里有这个定义,

DP[i,j] = f[i,j] + min(DP[i−1,j −1],DP[i−1,j],j +1])

定义了从 NxM 矩阵顶部到矩阵底部的最小应计成本。 f 中的每个单元格代表从另一个单元格到该单元格的值/成本(1.2、0、10 等)。

矩阵可能很大(1500x1500,它是 Gradient map of an image),我编写的 DP 算法对于我的矩阵每次运行大约需要 1 秒。这个矩阵每次执行需要运行数百次,所以总的程序运行时间需要几分钟。这个循环大约是我瓶颈的 99%,所以我试图用 Python/numpys 向量化方法优化这个循环。我只能访问 Numpy 和 Scipy。

注意:我几乎不使用 python 编程,所以解决方案可能只是明显的 idk。

第一次尝试,只是简单的循环,每次运行时间大约为 2-2.5 秒

DP = f.copy()
for r in range(2,len(DP) - 1): # Start at row 2 since row one doesn't change
    for c in range(1,len(DP[0]) - 1):
        DP[r][c] += min(DP[r - 1,c-1:c+2])

第二次尝试,我尝试利用一些 numpy 矢量化函数“fromiter”一次计算整行,而不是逐列计算,每次运行的时间约为 1-1.5 秒。我的目标是至少快一个数量级,但我不知道我还能如何优化它。

DP = f.copy()
for r in range(2,len(DP) - 1):
    def foo(arr):
        idx,val = arr
        if idx == 0 or idx == len(DP[[0]) - 1:
            return np.inf
        return val + min(DP[r - 1,idx - 1],DP[r - 1,idx],idx + 1])


    DP[r,:] = np.fromiter(map(foo,enumerate(DP[r,:])))

解决方法

正如 hpaulj 所说,由于您的问题本质上是连续的,因此很难完全矢量化,尽管这似乎是可能的(每个单元格都根据行 r=2 的值进行更新,不同之处在于所考虑的三元组数来自以下每一行的第 2 行)所以也许您可以找到一种聪明的方法来做到这一点!

话虽如此,一个快速且半矢量化的解决方案是使用 user42541 提出的执行 sliding windows with fancy indexing 的简洁方式,因此我们用矢量化调用替换了内部循环:

indexer = np.arange(3)[:,None] + np.arange(DP.shape[1] - 2)[None,:]
for r in range(2,DP.shape[0] - 1):
    DP[r,1:-1] += np.min(DP[r-1,indexer],axis = 0)

对于 1500x1500 的整数数组,相对于您的双循环方法(您的矢量化解决方案在我的电脑中不起作用),这会导致大约两个数量级的加速。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。