如何解决Pyspark 标准定标器 - 排除均值计算的空值 在拟合管道之前过滤掉 None 值用常数值替换缺失值使用输入法在没有 StandardScaler 的情况下缩放所需的列
我正在尝试将 standardScaler 用于 sparkML 库,用于具有空值列的数据框。我想保留空值,但是当我使用带有均值的标准缩放器时,具有空值的列的均值也变为空值。有没有办法让标准缩放器跳过均值计算的空值(如向量汇编程序中的 handleInvalid 选项)?
下面是代码示例
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
sqlContext = SparkSession.builder.appName('test').config("spark.submit.deployMode","client").enableHiveSupport().getorCreate()
test_df = sqlContext.createDataFrame([(1,2,None),(1,3,3),4,8),5,7),6,7,1),8,6),9,9),10,11,12)],schema=['col1','col2','col3'])
#%%
from pyspark.ml.feature import StringIndexer,VectorIndexer,VectorAssembler,StandardScaler
from pyspark.ml import Pipeline,PipelineModel
assmbler = VectorAssembler(inputCols=['col2','col3'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
在此之后,如果我尝试获得平均值。
pipe_fit.stages[1].mean
Out[5]: DenseVector([6.5,nan])
如您所见,第二列的平均值是 nan。有什么办法可以避免这种情况?
解决方法
Spark 的 StandardScaler 的 fit method 使用 a e
0 4
来计算列的均值:
Summarizer.metrics("mean","std")
Summarizer class 本身无法忽略 val Row(mean: Vector,std: Vector) = dataset
.select(Summarizer.metrics("mean","std").summary(col($(inputCol))).as("summary"))
.select("summary.mean","summary.std")
.first()
或 null
/NaN
值,因此没有针对该问题的内置解决方案。
有几个选项可以处理这个问题:
在拟合管道之前过滤掉 None 值
None
用常数值替换缺失值
test_df = test_df.filter("not col2 is null and not col3 is null")
使用输入法
向管道添加 Imputer 以使用特征的均值、中值或最频繁值替换缺失值:
test_df = test_df.fillna(0) #or any other value that is appropriate for the task
from pyspark.ml.feature import Imputer
imputer = Imputer(inputCols=['col2','col3'],outputCols=['col2i','col3i'])
assmbler = VectorAssembler(inputCols=['col2i','col3i'],outputCol='col_vec',handleInvalid='keep')
sclr = StandardScaler(withMean=True,inputCol='col_vec',outputCol='col_scaled')
pipeline = Pipeline(stages=[imputer,assmbler,sclr])
pipe_fit= pipeline.fit(test_df)
df_res = pipe_fit.transform(test_df)
现在返回
pipe_fit.stages[2].mean
因为 DenseVector([6.5,6.3])
中的缺失值已替换为该列的平均值。
对于 strategy parameter,可以使用中位数或最常见的值代替均值,但使用均值是默认值。
在没有 StandardScaler 的情况下缩放所需的列
使用标准 Spark SQL 函数 mean 和 stddev 可以实现类似 StandardScaler 的逻辑。两个 SQL 函数都能很好地处理 col3
值。
None
此逻辑将两列 cols = ['col2','col3'] # the columns that should be scaled
mean_and_std_cols=[c for col in cols for c in
(F.mean(col).alias(f"{col}_mean"),F.stddev(col).alias(f"{col}_std"))]
mean_and_std = test_df.select(mean_and_std_cols).first()
scaled_cols=[((F.col(col) - mean_and_std[f"{col}_mean"])
/mean_and_std[f"{col}_std"]).alias(f"{col}_s") for col in cols]
test_df = test_df.select(test_df.columns + scaled_cols)
和 col2_s
添加到包含缩放值的数据框。 col3_s
包含均值和标准差的实际值:
mean_and_std
新创建的列 Row(col2_mean=6.5,col2_std=3.0276503540974917,col3_mean=6.333333333333333,col3_std=3.4641016151377544)
和 col2_s
现在可以用作 VectorAssembler 的输入列:
col3_s
在大型数据集上,此选项可能比原始缩放器慢一点,因为均值和标准差的值不是近似值而是精确计算的。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。