微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

即使使用 latestFirst 禁用,结构化流 kafka 也会首先写入最新文件

如何解决即使使用 latestFirst 禁用,结构化流 kafka 也会首先写入最新文件

我有一份工作,我从 S3/Alluxio 读取一些镶木地板,转换为 avro,从架构注册表中捕获架构并沉入 kafka。为此,在我的编写器中,我使用了 Trigger.Once()。

但是由于某种原因,当我检查我的 kafka 主题时,第一条写入记录来自最后的镶木地板。

即使禁用了 LatexFirst 选项也会发生这种情况。

我该如何解决这个问题?在窗口中执行 Order By ?

请帮忙!

val dataFrame = spark.readStream
              .option("mergeSchema","true")
              .option("maxFilesPerTrigger",5)
              .schema(dfSchema)
            //  .parquet(s"alluxio://tenants/table/*")
              .parquet(s"s3a://tenants/table/*")
    dataFrame.withColumn("cdcTime",to_timestamp(col("cdcTime")))

val avroDF = dataFrame.
  .withColumn("key",to_confluent_avro(col("key"),getSchemaRegistryConfigKey(table)))
  .withColumn("value",when(col("Op") === "D",lit(null))
  .otherwise(to_confluent_avro(col("value"),valueJsonAvroSchema,getSchemaRegistryConfigValue(table))))
  .drop("Op")

   avroDF.writeStream
      .format("kafka")
      .queryName(s"${table.schema}/${table.name}")
      .option("kafka.bootstrap.servers","mybrokerUrl")
      .option("topic",table.topic)
      .trigger(Trigger.Once())
      .option("checkpointLocation",s"checkpoints/${table.schema}/${table.name}")
      .start()
      .awaitTermination()

Ps:我使用 ABRIS 将我的镶木地板转换为 AVRO

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。