微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Spark 每小时使用一个 Kafka 主题

如何解决使用 Spark 每小时使用一个 Kafka 主题

我想批量使用 Kafka 主题,我想每小时读取 Kafka 主题并读取最新的每小时数据。

val readStream = existingSparkSession
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers",hostAddress)
  .option("subscribe","kafka.raw")
  .load()

但是这总是读取前 20 个数据行,并且这些行是从最开始的,因此永远不会选择最新的数据行。

如何使用 scala 和 spark 每小时读取最新的行?

解决方法

如果您在批处理模式下阅读 Kafka 消息,则需要注意记录哪些数据是新的,哪些不是您自己的。请记住,Spark 不会将任何消息提交回 Kafka,因此每次重新启动批处理作业时,它都会从头开始读取(或基于设置 startingOffsets,对于批处理查询默认为 earliest。>

对于您希望每小时运行一次作业并且仅处理前一小时到达 Kafka 的新数据的场景,您可以使用 writeStream 触发器选项 Trigger.Once 进行流式查询。

Databricks 中有一个很好的 blog,它很好地解释了为什么使用 Trigger.Once 的流式查询比批处理查询更受欢迎。

重点是:

“当您运行执行增量更新的批处理作业时,您通常必须弄清楚哪些数据是新数据,哪些数据应该处理,哪些不应该处理。Structured Streaming 已经为您完成了这一切。 "

确保您还在 writeStream 中设置了“checkpointLocation”选项。最后,您可以有一个简单的 cron 作业,每小时提交一次流作业。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。