微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Pyspark 标准定标器 - 排除均值计算的空值 在拟合管道之前过滤掉 None 值用常数值替换缺失值使用输入法在没有 StandardScaler 的情况下缩放所需的列

如何解决Pyspark 标准定标器 - 排除均值计算的空值 在拟合管道之前过滤掉 None 值用常数值替换缺失值使用输入法在没有 StandardScaler 的情况下缩放所需的列

我正在尝试将 standardScaler 用于 sparkML 库,用于具有空值列的数据框。我想保留空值,但是当我使用带有均值的标准缩放器时,具有空值的列的均值也变为空值。有没有办法让标准缩放器跳过均值计算的空值(如向量汇编程序中的 handleInvalid 选项)?

下面是代码示例

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getorCreate() 

test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),4,8),5,7),6,7,1),8,6),9,9),10,11,12)],schema=['col1','col2','col3'])
#%%

from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel

assmbler  = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)

在此之后,如果我尝试获得平均值。

pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5,nan])

如您所见,第二列的平均值是 nan。有什么办法可以避免这种情况?

解决方法

Spark 的 StandardScaler 的 fit method 使用 a e 0 4 来计算列的均值:

Summarizer.metrics("mean","std")

Summarizer class 本身无法忽略 val Row(mean: Vector,std: Vector) = dataset .select(Summarizer.metrics("mean","std").summary(col($(inputCol))).as("summary")) .select("summary.mean","summary.std") .first() null/NaN 值,因此没有针对该问题的内置解决方案。

有几个选项可以处理这个问题:

在拟合管道之前过滤掉 None 值

None

用常数值替换缺失值

test_df = test_df.filter("not col2 is null and not col3 is null")

使用输入法

向管道添加 Imputer 以使用特征的均值、中值或最频繁值替换缺失值:

test_df = test_df.fillna(0) #or any other value that is appropriate for the task

from pyspark.ml.feature import Imputer imputer = Imputer(inputCols=['col2','col3'],outputCols=['col2i','col3i']) assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep') sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled') pipeline = Pipeline(stages=[imputer,assmbler,sclr]) pipe_fit= pipeline.fit(test_df) df_res = pipe_fit.transform(test_df) 现在返回

pipe_fit.stages[2].mean

因为 DenseVector([6.5,6.3]) 中的缺失值已替换为该列的平均值。

对于 strategy parameter,可以使用中位数或最常见的值代替均值,但使用均值是默认值。

在没有 StandardScaler 的情况下缩放所需的列

使用标准 Spark SQL 函数 meanstddev 可以实现类似 StandardScaler 的逻辑。两个 SQL 函数都能很好地处理 col3 值。

None

此逻辑将两列 cols = ['col2','col3'] # the columns that should be scaled mean_and_std_cols=[c for col in cols for c in (F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))] mean_and_std = test_df.select(mean_and_std_cols).first() scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"]) /mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols] test_df = test_df.select(test_df.columns + scaled_cols) col2_s 添加到包含缩放值的数据框。 col3_s 包含均值和标准差的实际值:

mean_and_std

新创建的列 Row(col2_mean=6.5,col2_std=3.0276503540974917,col3_mean=6.333333333333333,col3_std=3.4641016151377544) col2_s 现在可以用作 VectorAssembler 的输入列:

col3_s

在大型数据集上,此选项可能比原始缩放器慢一点,因为均值和标准差的值不是近似值而是精确计算的。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。