微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当偏移量存在间隙时,Kafka 结构化流应用程序抛出 IllegalStateException

如何解决当偏移量存在间隙时,Kafka 结构化流应用程序抛出 IllegalStateException

我有一个在 spark 2.3 上与 Kafka 一起运行的结构化流应用程序,

“spark-sql-kafka-0-10_2.11”版本是2.3.0

应用程序开始读取消息并成功处理它们,然后在到达特定偏移量后(如异常消息所示),抛出以下异常:

java.lang.IllegalStateException: Tried to fetch 666 but the returned record offset was 665
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:297)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
        at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
        at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
        at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:151)
        at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:142)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

它总是在相同的偏移量上失败,看起来这是由于偏移量的差距,因为我在 Kafka UI 中看到,在偏移量 665 之后有 667(由于某种原因跳过了 666),而 Kafka 客户端在我的结构化流应用程序尝试获取 666 并失败。

在深入了解 Spark 的代码后,我发现他们没想到会发生这种异常(根据评论):

https://github.com/apache/spark/blob/branch-2.3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala#L297

所以我想知道,我做错了什么吗? 或者这是我使用的特定版本的错误

解决方法

Spark 2.4 中修复了一个长期存在的 issue in Spark,它在 Kafka 和 Spark 之间造成了一点阻抗不匹配。部分修复已向后移植到 Spark 2.3.1,但仅在配置选项 true 设置为 from datetime import datetime import asyncio import httpx async def async_post(request_data): time_to_sleep = 0.005 action_time = '13:00:00' time_microseconds = 550000 async with httpx.AsyncClient(cookies=request_data['cookies']) as client: while True: now_time_second = datetime.now().strftime('%H:%M:%S') if action_time==now_time_second: break await asyncio.sleep(0.05) while True: now_time_microsecond = datetime.now().strftime('%f') if now_time_microsecond >= time_microseconds: break await asyncio.sleep(0.003) for _ in range(5): response = await client.post(request_data['url'],headers = request_data['headers'],params = request_data['params'],data = request_data['data'],timeout = 60) logger.info('Time: ' + str(datetime.now().strftime('%H:%M:%S.%f'))) logger.info('Text: ' + str(response.text)) logger.info('Response time: ' + str(response.headers['Date'])) await asyncio.sleep(time_to_sleep) def main(): loop = asyncio.get_event_loop() loop.run_until_complete( asyncio.gather(*[async_post(request_data) for request_data in all_requests_data])) 时才启用;正如您所观察到的,您很可能遇到了未向后移植的内容,在这种情况下,升级到 Spark 2.4 可能值得考虑。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。