微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark连续流不断重置kafka偏移

如何解决spark连续流不断重置kafka偏移

我正在尝试在 kafka 上使用 pyspark 设置一个简单的基于 foreach 的连续流,我得到以下日志:

21/03/07 15:44:00 INFO AppInfoParser: Kafka version: 2.4.1
21/03/07 15:44:00 INFO AppInfoParser: Kafka commitId: c57222ae8cd7866b
21/03/07 15:44:00 INFO AppInfoParser: Kafka startTimeMs: 1615131840856
21/03/07 15:44:00 INFO KafkaConsumer: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Subscribed to partition(s): telegram-test-0
21/03/07 15:44:00 INFO KafkaConsumer: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Seeking to offset 2 for partition telegram-test-0
21/03/07 15:44:01 INFO ContinuousWriteRDD: Writer for partition 0 in epoch 0 is committing.
21/03/07 15:44:01 INFO ContinuousWriteRDD: Writer for partition 0 in epoch 0 committed.
21/03/07 15:44:01 INFO PythonRunner: Times: total = 582,boot = 394,init = 188,finish = 0
21/03/07 15:44:01 INFO ContinuousWriteRDD: Writer for partition 0 in epoch 1 is committing.
21/03/07 15:44:01 INFO ContinuousWriteRDD: Writer for partition 0 in epoch 1 committed.
21/03/07 15:44:01 INFO Metadata: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Cluster ID: 22a3Ek5jSY2Ja1P8zcdclQ
21/03/07 15:44:01 INFO SubscriptionState: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Seeking to EARLIEST offset of partition telegram-test-0
21/03/07 15:44:02 INFO SubscriptionState: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Resetting offset for partition telegram-test-0 to offset 0.
21/03/07 15:44:02 INFO SubscriptionState: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Seeking to LATEST offset of partition telegram-test-0
21/03/07 15:44:02 INFO SubscriptionState: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Resetting offset for partition telegram-test-0 to offset 2.
21/03/07 15:44:02 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/03/07 15:44:02 INFO SubscriptionState: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Resetting offset for partition telegram-test-0 to offset 2.
21/03/07 15:44:02 WARN KafkaDataConsumer: KafkaDataConsumer is not running in UninterruptibleThread. It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894
21/03/07 15:44:02 INFO KafkaConsumer: [Consumer clientId=consumer-spark-group-1-1,groupId=spark-group-1] Seeking to offset 2 for partition telegram-test-0

似乎kafka readerStream正在重置从最新到earliset的偏移量......不幸的是,这使得在foreach()脚本中永远无法正确读取消息。

注意:完全相同的程序在微批处理模式下运行良好......

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。