如何解决PySpark 中使用pickled MLFlow 模型和pandas_udf 的预测
我有一个通过随机搜索找到的 LightGBM 模型,该模型使用 MLFlow 保存到 .pkl 文件中。目标是将该腌制模型加载到 Pyspark 中并在那里进行预测。用简单的 unpickling 就可以做到这一点:
with open(path,'rb') as f:
model = pickle.load(f)
然后应用pandas_udf:
import pyspark.sql.functions as F
@F.pandas_udf(returnType=DoubleType())
def predict_udf(*cols):
df = pd.concat(cols,axis=1)
return pd.Series(model.predict(df))
cols = columns_list
y_pred = X_pred.select(F.col('id'),predict_udf3(*cols).alias('prediction'))
如果我尝试显示、计算或保存输出,它会返回:
org.apache.spark.SparkException:作业因阶段失败而中止:阶段 18.0 中的任务 1 失败 1 次,最近失败:阶段 18.0 中丢失任务 1.0(TID 217,本地主机,执行程序驱动程序):org.apache .spark.SparkException:Python worker 意外退出(崩溃) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486)
model.predict() 部分本身在 python 中工作正常。如果我用更简单的东西替换上面 pandas_udf 函数中的 model.predict() ,例如设置一个常数,然后它就可以正常工作了,所以我认为问题不应该出在版本上,因为一两年前有人报告说 pyarrow、numpy 和 pandas 之间存在特定的工作组合,主要是由版本的变化引起的pyarrow (0.15+)。我目前正在使用:
pandas==0.25.1 (downgraded from 1.2.4)
numpy==1.17.2 (downgraded from 1.20.3)
pyarrow==0.14.0 (tried all versions between 0.14.0 and 4.0.0)
之前有报道说旧的 numpy(例如 0.14)可以解决这个问题,但现在这对我不起作用,因为如果 numpy 太旧,解压模型会失败。所以,有几个问题:
- 是否有可能将 MLFlow 模型解压到 Spark 中,然后执行 model.predict()?
- pandas_udf 是否正确完成?
- pandas_udf 实际上是最有效的方法吗?预计会进行数百万次预测,而且运行时间至关重要。
提前致谢!
解决方法
我自己解决了这个问题 - 如果 unpickled 对象太复杂(例如随机搜索、管道和模型),它会失败。如果我只是声明模型,拟合它然后腌制它,那么取消腌制和进行预测就没有问题。 确实,库的版本之间存在一些不兼容,但以下确实有效:
pyspark==2.4.3
pyarrow==0.14.0
pandas==1.2.4
numpy==1.17.2
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。