如何解决直到达到最新的偏移量,Kafka消息才会写入拼花文件
我想阅读一个Kafka主题并写入一个Parquet或增量文件,并能够在阅读Kafka主题中的所有消息之前从该Parquet文件中读取。我已经进行了这项工作,但后来进行了更改,现在必须等到所有消息都用完后,实木复合地板文件中才包含任何内容。我的代码在下面。
import org.apache.spark.sql.SparkSession
object MinimalTest {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("MinimalTest")
.getorCreate()
val kafkabrokers = "localhost:9092"
val topic = "Faketopic"
val startingOffsets = "earliest"
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkabrokers)
.option("startingOffsets",startingOffsets)
.option("subscribe",topic)
.load()
val path = "<dir>/MinimalTest"
val checkpointLocation = "<dir>/CheckpointMinimalTest"
df.writeStream
.format("parquet")
.outputMode("append")
.option("checkpointLocation",checkpointLocation)
.option("path",path)
.start()
spark.streams.awaitAnyTermination()
}
}
我没有找到任何遇到相同问题的人,也没有通过阅读相关文档找到解决方案。我想有人告诉我要承诺。我尝试将“ enable.auto.commit”设置为true,但随后收到一条错误消息,提示不支持“ enable.auto.commit”。
我正在使用Spark.2.4.4
解决方法
您可以通过在Kafka源选项(Structured Streaming + Kafka Integration Guide)中设置maxOffsetsPerTrigger
来限制每个触发器处理的偏移量:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers",kafkaBrokers)
.option("startingOffsets",startingOffsets)
.option("maxOffsetsPerTrigger",10)
.option("subscribe",topic)
.load()
如果未定义maxOffsetsPerTrigger
,将使用最新的偏移量,如您在Spark 2.4.4 code中所见。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。