微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么我可以将 DStream[String] 转换为 DStream[List[String]] 而不能转换为 DStream[DataFrame]?

如何解决为什么我可以将 DStream[String] 转换为 DStream[List[String]] 而不能转换为 DStream[DataFrame]?

我的问题是关于传统火花串流中的 DStream 处理。

我想知道为什么当我将 DStream[String] 转换为 DStream[List[String]] 时一切正常,但是当我尝试使用 toDF() 方法将此生成的列表转换为数据帧时,我得到空指针错误

 val data: DStream[Kafka] = stream.map(rdd => Kafka(rdd.partition,rdd.offset,rdd.topic,rdd.value))

val groupedRDD: DStream[List[Kafka]] = data.mapPartitions(group => {
  val groupedData = group.toList
  List(groupedData).iterator
})

目前代码运行正常,groupedRDD变量中生成的值可以作为列表正常操作。

然而,当我尝试执行下一个代码块时,我得到了空指针错误

val groupedRDD: DStream[DataFrame] = processMessages.mapPartitions(x => {
  val dataFrame = x.map(a => a.toDF)
  dataFrame
})

谁能向我解释为什么会发生这种情况?如果有办法解决这个问题怎么办?

注意:我已经知道我可以使用 foreachRDD 将数据转换为 DataFrame,但我想知道是否有任何方法可以将其存储在 DStream 类型 Dstream[DataFrame] 中。

@edited

这是我每次尝试运行代码时遇到的错误

21/06/29 20:32:02 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
21/06/29 20:32:02 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
21/06/29 20:32:02 ERROR Executor: Exception in task 2.0 in stage 0.0 (TID 2)
java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
21/06/29 20:32:02 ERROR TaskSetManager: Task 1 in stage 0.0 Failed 1 times; aborting job
21/06/29 20:32:02 ERROR JobScheduler: Error running job streaming job 1625009520000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 Failed 1 times,most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (CPX-G1J9NCNEEJ0.dir.svc.accenture.com executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
    at app.AppStreaming$.$anonfun$appStreaming$7(AppStreaming.scala:87)
    at app.AppStreaming$.$anonfun$appStreaming$7$adapted(AppStreaming.scala:86)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.Try$.apply(Try.scala:209)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    ... 3 more
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 Failed 1 times,most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (CPX-G1J9NCNEEJ0.dir.svc.accenture.com executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2258)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:1012)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:1010)
    at app.AppStreaming$.$anonfun$appStreaming$7(AppStreaming.scala:87)
    at app.AppStreaming$.$anonfun$appStreaming$7$adapted(AppStreaming.scala:86)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2(DStream.scala:629)
    at org.apache.spark.streaming.dstream.DStream.$anonfun$foreachRDD$2$adapted(DStream.scala:629)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:417)
    at org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.Try$.apply(Try.scala:209)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1(JobScheduler.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.sqlImplicits.localSeqToDatasetHolder(sqlImplicits.scala:231)
    at app.AppStreaming$.$anonfun$appStreaming$6(AppStreaming.scala:82)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:457)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
    at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    ... 3 more

Process finished with exit code 1

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。