微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

引起:MatchError: [Ljava.lang.String;@21a536b1 (of class [Ljava.lang.String;) in join 2 Dataframe

如何解决引起:MatchError: [Ljava.lang.String;@21a536b1 (of class [Ljava.lang.String;) in join 2 Dataframe

我正在尝试在 Databricks 环境中使用 Scala 在 Apache Spark 中加入 2 个数据帧在加入这 2 个数据帧时,我收到一个错误,我无法弄清楚问题是什么以及如何解决它。非常感谢任何帮助。

一个输入文件

   %scala
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.functions._

   val rawUserArtistData = sc.textFile("/FileStore/tables/user_artist_data.txt")
   val rawUserArtistDataDF = rawUserArtistData.map(_.split(" ")).map{case Array(a,b,c) => 
   (a.toInt,b.toInt,c.toInt)}.toDF("userid","artist_id","playcount")

   rawUserArtistDataDF.show() 

输出

   +-------+---------+---------+
   | userid|artist_id|playcount|
   +-------+---------+---------+
   |1000002|        1|       55|
   |1000002|  1000006|       33|
   |1000002|  1000007|        8|
   |1000002|  1000009|      144|
   |1000002|  1000010|      314|
   |1000002|  1000013|        8|
   |1000002|  1000014|       42|
   |1000002|  1000017|       69|
   |1000002|  1000024|      329|
   |1000002|  1000025|        1|
   +-------+---------+---------+

第二个文件

 %scala
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.functions._

 val rawArtistData = sc.textFile("/FileStore/tables/artist_data.txt")
 val rawArtistDataDF = rawArtistData.map(_.split("\t")).map{case Array(a,b) => 
 (a.toInt,b)}.toDF("artistid","artist_name")

 rawArtistDataDF.show(10,false)

输出

 +--------+---------------------------------+
 |artistid|artist_name                      |
 +--------+---------------------------------+
 |1134999 |06Crazy Life                     |
 |6821360 |Pang Nakarin                     |
 |10113088|Terfel,Bartoli- Mozart: Don     |
 |10151459|The Flaming Sidebur              |
 |6826647 |Bodenstandig 3000                |
 |10186265|Jota Quest e Ivete Sangalo       |
 |6828986 |Toto_XX (1977                    |
 |10236364|U.S Bombs -                      |
 |1135000 |artist formaly kNow as Mat       |
 |10299728|Kassierer - Musik für beide Ohren|
 +--------+---------------------------------+ 

加入数据框代码

%scala

val CombinedDF = rawUserArtistDataDF.join(rawArtistDataDF,rawUserArtistDataDF("artist_id") === rawArtistDataDF("artistid"),"leftouter")

CombinedDF.show()

错误

 Job aborted due to stage failure.
 Caused by: MatchError: [Ljava.lang.String;@21a536b1 (of class [Ljava.lang.String;)
 at 
 $line72e2ce7142694dbeb5cc11da58bc59cb37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$rawArtistDataDF$2(command-1764271964671849:5)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(UnkNown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:754)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:148)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:732)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:735)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2766)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2713)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2707)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2707)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1256)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2974)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2915)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2903)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: scala.MatchError: [Ljava.lang.String;@21a536b1 (of class [Ljava.lang.String;)
    at $line72e2ce7142694dbeb5cc11da58bc59cb37.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$rawArtistDataDF$2(command-1764271964671849:5)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(UnkNown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:754)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:155)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:148)
    at org.apache.spark.scheduler.Task.run(Task.scala:117)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$10(Executor.scala:732)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:735)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

 
 
 
 
 
 

解决方法

我的第二个文件有问题,我通过以下方式解决了

%scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val rawArtistData = sc.textFile("/FileStore/tables/artist_data.txt")

val rawArtistDataDF = rawArtistData.flatMap { line =>
  val (id,name) = line.span(_ != '\t')
  if (name.isEmpty) {
    None
  } else {
    try {
      Some((id.toInt,name.trim))
    } catch {
      case _: NumberFormatException => None
    }
  } 
}.toDF("artistid","artist_name")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。