微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 MQ 在 Spark Structured Streaming 中添加故障恢复

如何解决如何使用 MQ 在 Spark Structured Streaming 中添加故障恢复

我正在使用 activeMQ 从主题中读取消息

val df = spark
            .readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl",brokerUrl_)
            .option("topic",topicName_)
            .option("persistence","memory")
            .option("cleanSession","true")
            .option("username",username_)
            .option("password",password_)
            .load()

然后我将其写入 CSV 文件

            df
                .writeStream
                .outputMode("append")
                .format("csv")
                .option("checkpoint",checkpointLocation)
                .option("path",path_)
                .option("truncate",value = false)
                .start
                .awaitTermination()

假设我正在向此发送消息,并且在接收消息之前失败,然后在下一次开始时我想从该失败消息开始读取。这能实现吗??

编辑: 通过“向此发送消息”,我的意思是在 ActiveMQ 主题中排队一条消息,如果 spark 应用程序在收到消息之前失败,那么我如何读取失败的消息? 我曾尝试在 checkpoint 中使用 spark.sparkContext.setCheckpointDir(path_of_checkpoint),但由于偏移量不同,应用程序在接收任何新消息时崩溃,我猜 ActiveMQ 不支持kafka 之类的偏移量加载。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。