微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Pyspark 与 sklearn 和数据帧类型转换

如何解决Pyspark 与 sklearn 和数据帧类型转换

我正在尝试将 sklearn 与 pyspark 结合使用,但我遇到了一些性能问题。假设我有一个数据集,它已经通过了一个管道,其中特征被矢量化和规范化。为了使用 sklearn,我必须为算法提供一个数组或一个 Pandas 数据框。考虑到我的数据集的大小(2.8M + 并且应该很快会变大),将训练集转换为 Pandas 非常缓慢,所以我使用的是 numpy 方法

train =  np.array(train.select('features').collect()).squeeze()

这相对较慢,因为我需要使用 collect 将数据推回驱动程序。有没有其他更快更好的方法?此外,由于问题的性质,我目前以非标准方式处理我的评分函数

def score (fitmodel,test):
   predY = fitmodel.score_samples(test)
   return np.full((1,len(predY)),np.mean(predY)).transpose()

这个想法是计算预测的平均值,然后返回一个数组,该数组的重复次数与测试的记录数相同。例如。如果我的测试集有 450 条记录,我将返回一个形状为 (450,1) 的数组,其中所有 450 条记录都具有相同的值(预测的平均值)。虽然很慢,但到目前为止一切顺利,一切都按预期进行。我的问题是我需要通过多次执行此操作(更改测试集)来继续进行测试,并将结果附加到单个数组中,以便稍后评估模型的性能。我的代码

for _ in tdqm(range(450,800)):
    #Get group #X
    _test = df.where(col('index') == _) #Get a different "chunk" of the dataset each iteration
    _test.coalesce(2) 
    #Apply pipeline with transforms
    test = pipelineModel.transform(_test)
    y_test = np.array(test.select('label').collect())
    x_test = np.array(test.select('features').collect()).squeeze()
    pred = newscore(x_test,model)
    
    if(_ == (450)): #First run
        trueY = y_test
        predY = pred
    else:
        trueY = np.append(trueY,y_test)
        predY = np.append(predY,pred)

简而言之,我获取数据集的特定部分,对其进行测试,然后我想附加预测和“真实”标签以供以后评估。我的主要问题是做 2 个收集和一个 np.append 需要很长时间,我需要找到替代方案。测试整个测试集(大约 40 万个条目)需要大约 1 分钟,但所有这些都到位后,时间增加到 2 小时 20 分钟。 最重要的是,我必须将数组转换回 pyspark 数据帧以使用 mllib 评估函数,这会为该过程增加更多时间。

说了这么多,有人能指出我可以以更有效的方式完成这项工作的方向吗?也许还有另一种使用 spark 和 sklearn 的方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。