如何解决无法将 JDBC 格式数据集拟合到 LinearRregression 模型
我正在尝试使用远程服务器中可用的数据集构建线性回归模型。
首先,我使用以下代码拉取 RDS:
df = spark.read \
.format("jdbc") \
.option("url","jdbc:xxx") \
.option("dbtable","xxx") \
.option("user","xxx") \
.option("driver","org.postgresql.Driver") \
.option("password","xxx") \
.load()
清理并准备好数据集后,我开始使用 MLlib 构建模型,如下所示:
s = list(set(df.columns) - {'SalePrice'})
assembler = VectorAssembler(inputCols=s,outputCol='features')
output = assembler.transform(df)
train_data,test_data=output.randomSplit([0.7,0.3])
slr = LinearRegression(featuresCol='features',labelCol='SalePrice')
trained_model = slr.fit(train_data)
当我运行此代码时,我收到一个错误
Py4JJavaError: An error occurred while calling o862.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 134.0 failed 1 times,most recent failure: Lost task 0.0 in stage 134.0 (TID 325,host1,executor driver): org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$2973/665389743: (struct<MSSubClass_double_VectorAssembler_5c8b32d67297:double,YearBuilt_double_VectorAssembler_5c8b32d67297:double,BsmtFinSF1:double,PoolArea_double_VectorAssembler_5c8b32d67297:double,YearRemodAdd_double_VectorAssembler_5c8b32d67297:double,ndFlrSF_double_VectorAssembler_5c8b32d67297:double,BsmtHalfBath:double,EnclosedPorch_double_VectorAssembler_5c8b32d67297:double,TotRmsAbvGrd_double_VectorAssembler_5c8b32d67297:double,GarageCars:double,YrSold_double_VectorAssembler_5c8b32d67297:double,BedroomAbvGr_double_VectorAssembler_5c8b32d67297:double,BsmtFinSF2:double,HalfBath_double_VectorAssembler_5c8b32d67297:double,KitchenAbvGr_double_VectorAssembler_5c8b32d67297:double,WoodDeckSF_double_VectorAssembler_5c8b32d67297:double,Fireplaces_double_VectorAssembler_5c8b32d67297:double,GarageArea:double,BsmtUnfSF:double,OverallQual_double_VectorAssembler_5c8b32d67297:double,LotArea_double_VectorAssembler_5c8b32d67297:double,TotalBsmtSF:double,BsmtFullBath:double,OverallCond_double_VectorAssembler_5c8b32d67297:double,Id_double_VectorAssembler_5c8b32d67297:double,SsnPorch_double_VectorAssembler_5c8b32d67297:double,GrLivArea_double_VectorAssembler_5c8b32d67297:double,ScreenPorch_double_VectorAssembler_5c8b32d67297:double,MoSold_double_VectorAssembler_5c8b32d67297:double,FullBath_double_VectorAssembler_5c8b32d67297:double,LotFrontage:double,LowQualFinSF_double_VectorAssembler_5c8b32d67297:double,OpenPorchSF_double_VectorAssembler_5c8b32d67297:double,stFlrSF_double_VectorAssembler_5c8b32d67297:double,MiscVal_double_VectorAssembler_5c8b32d67297:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1204)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1205)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
... 31 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2194)
at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1157)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1151)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1220)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1196)
at org.apache.spark.ml.optim.WeightedLeastSquares.fit(WeightedLeastSquares.scala:107)
at org.apache.spark.ml.regression.LinearRegression.$anonfun$train$1(LinearRegression.scala:334)
at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:313)
at org.apache.spark.ml.regression.LinearRegression.train(LinearRegression.scala:180)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:150)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$2973/665389743: (struct<MSSubClass_double_VectorAssembler_5c8b32d67297:double,values:array<double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:219)
at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:219)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1429)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$3(RDD.scala:1204)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$5(RDD.scala:1205)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
... 31 more
请注意,如果我使用 spark.read.csv
从本地设备读取相同的数据集,则不会出现错误,并且我能够成功拟合和测试模型。
我该如何解决这个问题?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。