微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

直到达到最新的偏移量,Kafka消息才会写入拼花文件

如何解决直到达到最新的偏移量,Kafka消息才会写入拼花文件

我想阅读一个Kafka主题并写入一个Parquet或增量文件,并能够在阅读Kafka主题中的所有消息之前从该Parquet文件中读取。我已经进行了这项工作,但后来进行了更改,现在必须等到所有消息都用完后,实木复合地板文件中才包含任何内容。我的代码在下面。

import org.apache.spark.sql.SparkSession

object MinimalTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder
      .appName("MinimalTest")
      .getorCreate()

    val kafkabrokers = "localhost:9092"
    val topic = "Faketopic"

    val startingOffsets = "earliest"

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",kafkabrokers)
      .option("startingOffsets",startingOffsets)
      .option("subscribe",topic)
      .load()

    val path = "<dir>/MinimalTest"
    val checkpointLocation = "<dir>/CheckpointMinimalTest"

    df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation",checkpointLocation)
      .option("path",path)
      .start()

    spark.streams.awaitAnyTermination()
  }
}

我没有找到任何遇到相同问题的人,也没有通过阅读相关文档找到解决方案。我想有人告诉我要承诺。我尝试将“ enable.auto.commit”设置为true,但随后收到一条错误消息,提示不支持“ enable.auto.commit”。

我正在使用Spark.2.4.4

解决方法

您可以通过在Kafka源选项(Structured Streaming + Kafka Integration Guide)中设置maxOffsetsPerTrigger来限制每个触发器处理的偏移量:

val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",kafkaBrokers)
      .option("startingOffsets",startingOffsets)
      .option("maxOffsetsPerTrigger",10)
      .option("subscribe",topic)
      .load()

如果未定义maxOffsetsPerTrigger,将使用最新的偏移量,如您在Spark 2.4.4 code中所见。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。