如何解决如何使用 MQ 在 Spark Structured Streaming 中添加故障恢复
我正在使用 activeMQ 从主题中读取消息
val df = spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl",brokerUrl_)
.option("topic",topicName_)
.option("persistence","memory")
.option("cleanSession","true")
.option("username",username_)
.option("password",password_)
.load()
然后我将其写入 CSV 文件:
df
.writeStream
.outputMode("append")
.format("csv")
.option("checkpoint",checkpointLocation)
.option("path",path_)
.option("truncate",value = false)
.start
.awaitTermination()
假设我正在向此发送消息,并且在接收消息之前失败,然后在下一次开始时我想从该失败消息开始读取。这能实现吗??
编辑:
通过“向此发送消息”,我的意思是在 ActiveMQ 主题中排队一条消息,如果 spark 应用程序在收到消息之前失败,那么我如何读取失败的消息?
我曾尝试在 checkpoint
中使用 spark.sparkContext.setCheckpointDir(path_of_checkpoint)
,但由于偏移量不同,应用程序在接收任何新消息时崩溃,我猜 ActiveMQ 不支持从 kafka
之类的偏移量加载。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。