与itertools和numba并行

如何解决与itertools和numba并行

我从事一个项目已有一段时间了,该项目需要计算一些非常大的数据集,并且很快就超出了我微薄的Excel知识可以处理的范围。在最近的几天里,我开始学习Python,它有助于处理我正在处理的数据量,但是这些数据集的估计处理时间看起来非常长(可能在我的笔记本电脑上可能长达数百年) )。

这里的瓶颈是一个可能产生数万亿或数百万个结果的方程式,因为它正在计算6个不同列表的每种组合,并通过您将在代码中看到的方程式运行它。该代码按原样可以正常工作,但是对于比我包括的示例更大的数据集来说是不可行的。真实的数据集将更像Set1S,2S和3S,每个50个项目,而Sets12A ...每个大约2500个项目(在这种情况下为50x50。这些集合的长度始终等于前三个列表的平方,但是我在这里简单明了。)

我很清楚结果的数量绝对是巨大的,但是我想尽可能地从一个尽可能大的数据集开始,所以我可以看到我可以减少多少输入大小而在绘制时不会对结果产生很大的影响累积百分比直方图。

'Calculator'
import numpy as np
Set1S = np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
Set2S = np.array([1,15])
Set3S = np.array([1,15])
Set12A = np.array([1,15])
Set23A = np.array([1,15])
Set13A = np.array([1,15])
'Define an empty array to add results'
BlockVol = []
from itertools import product 
'itertools iterates through all combinations of lists'
for i,j,k,a,b,c in product(Set1S,Set2S,Set3S,Set12A,Set23A,Set13A):
    'This is the bottleneck equation,with large input datasets'
    BlockVol.append((abs(i*j*k*np.sin(a)*np.sin(b)*np.sin(c))))
arr = np.array(BlockVol)
'manipulate the result list a couple ways'
BlockVol = np.cbrt(BlockVol)
BlockVol = BlockVol*12
'quick check to size of results list'
len(BlockVol)

仅花了时间,我就花了大约3分钟的时间才能获得1130万个结果。

我已经了解了@njit,在最后一天左右有了prange,但是在尝试将我的作品转换为这种格式时有些困惑。我的台式机确实具有不错的GPU,因此我认为可以大大加快速度。我很清楚下面的代码一个垃圾,什么也没做,但是我希望至少我能理解我要做的事情。

似乎要走的路是用我的6个输入列表定义一个函数,但是我只是不确定如何将itertools产品和njit融合在一起。

import numpy as np
from itertools import product
from numba import njit,prange

@njit(parallel = True)
def BlockVolCalc(Set1S,Set13A):
    numRows =Len(Set12A)
    BlockVol = np.zeros(numRows)
    for i,Set13A):
        BlockVol.append((abs(i*j*k*np.sin(a)*np.sin(b)*np.sin(c))))
arr = np.array(BlockVol)
BlockVol = np.cbrt(BlockVol)
BlockVol = BlockVol*12
len(BlockVol)

我们非常感谢您的帮助,因为这都是非常新的内容,而且势不可挡。

谢谢!

解决方法

我仅通过NumPy代码解决了您的任务,如果可能的话,最好只使用NumPy而不是笨重的Numba。下一个仅NumPy的代码将与使用Numba的相同解决方案一样快。

我的代码比参考代码快2800倍,时间是在代码结尾处计算的。

在下一个代码BlockValCalcRef(...)中,功能只是组织为功能的参考代码。 BlockVolCalc(...)是我基于NumPy的函数,应该可以大大提高速度。最后,我声明np.allclose(...),以检查两个解决方案给出的结果是否相同。

我还简化了一些集合的创建,以使用一个N参数来生成集合,在您的现实世界中,您只需提供必要的集合即可。

为了解决任务,我做了几件事:

  1. 我没有为相同的值多次计算np.sin(...),而是为Set12A,Set23A,Set13A预先计算了一次。还要为所有集合预先计算np.abs(...)
  2. 为了计算叉积,我使用了特殊的numpy数组索引方式,例如[None,None,:,None],这使我们可以使用所谓的流行numpy arrays broadcasting

我还想过如何进一步改进代码,使其速度提高6倍,但我认为即使以当前的巨大速度,您也将在几秒钟内填满计算机的整个RAM。接下来是如何改善的想法,当前叉积是对6个数字的每一步积进行计算,而不是这个可以计算K - 1集的乘积,然后将此数组乘以K-th集为了获得K套产品。这将使6有更多的加速时间(因为有6组),因为您只需要一个乘法而不是6

更新:根据上面的段落,我已经实现了功能BlockVolCalc2(...)的第二个改进版本。它具有2800x的加速性能,对于更大的N,可能会更快。

Try it online!

import numpy as np,time

N = 7

Set1S = np.arange(1,N + 1)
Set2S = np.arange(1,N + 1)
Set3S = np.arange(1,N + 1)
Set12A = np.arange(1,N + 1)
Set23A = np.arange(1,N + 1)
Set13A = np.arange(1,N + 1)

def BlockValCalcRef(Set1S,Set2S,Set3S,Set12A,Set13A):
    BlockVol = []
    from itertools import product 
    for i,j,k,a,b,c in product(Set1S,Set13A):
        BlockVol.append((abs(i*j*k*np.sin(a)*np.sin(b)*np.sin(c))))
    return np.array(BlockVol)
    
def BlockVolCalc(Set1S,Set13A):
    Set1S,Set3S = np.abs(Set1S),np.abs(Set2S),np.abs(Set3S)
    Set12A,Set13A = np.abs(np.sin(Set12A)),np.abs(np.sin(Set23A)),np.abs(np.sin(Set13A))
    return (
        Set1S[:,None] *
        Set2S[None,None] *
        Set3S[None,None] *
        Set12A[None,None] *
        Set23A[None,None] *
        Set13A[None,:]
    ).ravel()
    
def BlockVolCalc2(Set1S,np.abs(np.sin(Set13A))
    prod = np.ones((1,),dtype = np.float32)
    for s in reversed([Set1S,Set13A]):
        prod = (s[:,None] * prod[None,:]).ravel()
    return prod

# -------- Testing Correctness and Time Measuring --------
    
tb = time.time()    
a0 = BlockValCalcRef(Set1S,Set13A),t0 = time.time() - tb
print(f'base time {round(t0,4)} sec')

tb = time.time()    
a1 = BlockVolCalc(Set1S,Set13A)
t1 = time.time() - tb
print(f'improved time {round(t1,4)} sec,speedup {round(t0 / t1,2)}x')

tb = time.time()    
a2 = BlockVolCalc2(Set1S,Set13A)
t2 = time.time() - tb
print(f'improved2 time {round(t2,speedup {round(t0 / t2,2)}x')
    
assert np.allclose(a0,a1)
assert np.allclose(a0,a2)

输出:

base time 2.7569 sec
improved time 0.0015 sec,speedup 1834.83x
improved2 time 0.001 sec,speedup 2755.09x

嵌入到您最初的第一个代码中的函数看起来像here in this code

我还创建了基于TensorFlow的代码变体,它将使用您的所有CPU内核和GPU,此代码需要通过python -m pip install --upgrade numpy tensorflow一次安装tensorflow:

import numpy as np

N = 18
Set1S,Set13A = [np.arange(1 + i,N + 1 + i) for i in range(6)]
dtype = np.float32

def Prepare(Set1S,Set13A):
    import numpy as np
    Set12A,Set13A = np.sin(Set12A),np.sin(Set23A),np.sin(Set13A)
    return [np.abs(s).astype(dtype) for s in [Set1S,Set13A]]
    
sets = Prepare(Set1S,Set13A)

def ProcessNP(sets):
    import numpy as np
    res = np.ones((1,dtype = dtype)
    for s in reversed(sets):
        res = (s[:,None] * res[None,:]).ravel()
    res = np.cbrt(res) * 12
    return res

def ProcessTF(sets,*,state = {}):
    if 'graph' not in state:
        import os
        os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
        import numpy as np,tensorflow as tf
        tf.compat.v1.disable_eager_execution()
        cpus = tf.config.list_logical_devices('CPU')
        #print(f"CPUs: {[e.name for e in cpus]}")
        gpus = tf.config.list_logical_devices('GPU')
        #print(f"GPUs: {[e.name for e in gpus]}")
        print(f"GPU: {len(gpus) > 0}")
        state['graph'] = tf.Graph()
        state['sess'] = tf.compat.v1.Session(graph = state['graph'])
        #tf.device(cpus[0].name if len(gpus) == 0 else gpus[0].name)
        with state['sess'].as_default(),state['graph'].as_default():
            res = tf.ones((1,dtype = dtype)
            state['inp'] = []
            for s in reversed(sets):
                sph = tf.compat.v1.placeholder(dtype,s.shape)
                state['inp'].insert(0,sph)
                res = sph[:,:]
                res = tf.reshape(res,(tf.size(res),))
            res = tf.math.pow(res,1 / 3) * 12
            state['out'] = res
        def Run(sets):
            with state['sess'].as_default(),state['graph'].as_default():
                return tf.compat.v1.get_default_session().run(
                    state['out'],{ph: s for ph,s in zip(state['inp'],sets)}
                )
        state['run'] = Run

    return state['run'](sets)

# ------------ Testing ------------

npa,tfa = ProcessNP(sets),ProcessTF(sets)
assert np.allclose(npa,tfa)

from timeit import timeit

print('Nums:',round(npa.size / 10 ** 6,3),'M')
timeit_num = 2
print('NP:',round(timeit(lambda: ProcessNP(sets),number = timeit_num) / timeit_num,'sec')
print('TF:',round(timeit(lambda: ProcessTF(sets),'sec')

在我的2核CPU上打印:

GPU: False
Nums: 34.012 M
NP: 3.487 sec
TF: 1.185 sec

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?