如何解决在同一个Spark结构化流作业中使用两个WriteStreams
我有一种情况,我想将相同的流数据帧保存到两个不同的流接收器。
我创建了一个流数据框,需要将其发送到Kafka主题和三角洲。
我曾考虑使用forEachBatch,但看起来它不支持多个STREAMING SINKS。
此外,我尝试将spark session.awaitAnyTermination()与多个写入流一起使用。但是第二个流没有得到处理!
有没有一种方法可以实现这一目标?!
这是我的代码:
- 我正在从Kafka流中读取数据,并创建一个流数据帧。
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","ingestionTopic1")
.load()
df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)").as[(String,String)]
- 将上述数据框写入Kafka主题
val ds1 = df.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9082")
.option("topic","outputTopic1")
.start()
- 将相同的流数据帧写入三角洲湖
val ds2 = df.format("delta")
.outputMode("append")
.option("checkpointLocation","/test/delta/events/_checkpoints/etlflow")
.start("/test/delta/events")
ds1.awaitTermination
ds2.awaitTermination
解决方法
将一个输入流用于多个输出流需要遵循以下几点:
-
您需要确保在两个输出流中具有两个不同的checkpointLocations。
-
此外,您需要确保在第二个输出查询上也具有writeStream调用。
-
总体而言,在等待两个查询终止之前启动两个查询很重要。 (您已经在执行此操作)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。