如何解决什么时候在 Apache Spark StreamingQueryListeners 中触发 onQueryTerminated?
我正在开发一个自定义的 StreamingQueryListener,我想在测试中触发它的 onQueryTerminated 方法。
这是我尝试实现的:
import org.apache.spark.sql.{ sqlContext,SparkSession }
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{ col,to_date }
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.scalatest.flatspec.AnyFlatSpec
class MyListener extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = println(event.exception)
}
class ListenerSpec extends AnyFlatSpec {
it should "trigger onQueryTerminated" in {
val spark = SparkSession.builder().master("local[*]").getorCreate()
spark.streams.addListener(new MyListener())
implicit val sqlContext: sqlContext = spark.sqlContext
import spark.implicits._
val stream = MemoryStream[Int]
stream.addData(Seq(1,3,4))
val query = stream
.toDF()
.withColumn("columnDoesntExist",to_date(col("names")))
.writeStream
.format("console")
.start()
query.awaitTermination()
}
}
但是,这不起作用,因为它引发了 AnalysisException 但 onQueryTerminated 方法不是由流式查询的终止触发的。
在什么情况下触发该方法并且 event.exception 是 Some(exception)?
更新
以下代码成功触发了onQueryTerminated
的执行:
val exceptionUdf = udf(() => throw new Exception())
val query = stream
.toDF()
.withColumn("exception",exceptionUdf())
.writeStream
.format("console")
.start()
有关原因的解释,请参阅已接受的答案。
解决方法
根据“Stream Processing with Apache Spark”(由 O'Reilly 出版)一书,onQueryTerminated
方法得到
"在流查询停止时调用。 event
包含与开始事件相关的 id
和 runId
字段。它还提供了一个 {如果查询因错误而失败,则包含 exception
的 {1}} 字段。”
当您收到 exception
时,您的查询甚至还没有开始。它只到了 Catalyst 优化器的四个阶段中的第一个阶段,也就是“分析”,还没有转化为可运行的代码:
有关 Catalyst Optimizer 的更多详细信息。
AnalysisException 仅表示与目录相关的代码中存在问题,这正是您打算执行的操作:引用不存在的列(在目录中)。
如果您想运行 AnalysisException
方法的执行,您需要实现一个工作代码,但在它已经运行时失败(例如,提供错误的数据输入类型)。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。