微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Storm Kafka spout 在 X 元组后崩溃

如何解决Apache Storm Kafka spout 在 X 元组后崩溃

我目前遇到一个问题,即我们的大多数拓扑在 Kafka spout 发出约 100.000.000 个元组后崩溃。我收到以下日志消息:

2021-06-09 05:58:26.288 o.a.k.c.c.i.Fetcher Thread-15-kafka_fennec-executor[5,5] [INFO] [Consumer clientId=consumer-scylla-real-time-values-uniques-topology-1,groupId=scylla-real-time-values-uniques-topology] Fetch offset 4647977280 is out of range for partition fennec-2,resetting offset
2021-06-09 05:58:26.296 o.a.k.c.c.i.SubscriptionState Thread-15-kafka_fennec-executor[5,groupId=scylla-real-time-values-uniques-topology] Resetting offset for partition fennec-2 to offset 4550519581.
2021-06-09 05:58:39.959 o.a.s.u.Utils Thread-15-kafka_fennec-executor[5,5] [ERROR] Async loop died!
java.lang.IllegalStateException: The offset [4550519581] is below the current nextCommitOffset [4647975756] for [fennec-2]. This should not be possible,and likely indicates a bug in the spout's acking or emit logic.
    at org.apache.storm.kafka.spout.internal.OffsetManager.findNextCommitOffset(OffsetManager.java:141) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.commitOffsetsForAckedTuples(Kafkaspout.java:508) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.nextTuple(Kafkaspout.java:282) ~[stormjar.jar:?]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:192) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:159) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$1.run(Utils.java:392) [storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
2021-06-09 05:58:39.968 o.a.s.e.e.ReportError Thread-15-kafka_fennec-executor[5,5] [ERROR] Error
java.lang.RuntimeException: java.lang.IllegalStateException: The offset [4550519581] is below the current nextCommitOffset [4647975756] for [fennec-2]. This should not be possible,and likely indicates a bug in the spout's acking or emit logic.
    at org.apache.storm.utils.Utils$1.run(Utils.java:407) ~[storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.IllegalStateException: The offset [4550519581] is below the current nextCommitOffset [4647975756] for [fennec-2]. This should not be possible,and likely indicates a bug in the spout's acking or emit logic.
    at org.apache.storm.kafka.spout.internal.OffsetManager.findNextCommitOffset(OffsetManager.java:141) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.commitOffsetsForAckedTuples(Kafkaspout.java:508) ~[stormjar.jar:?]
    at org.apache.storm.kafka.spout.Kafkaspout.nextTuple(Kafkaspout.java:282) ~[stormjar.jar:?]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:192) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.spout.spoutExecutor$2.call(spoutExecutor.java:159) ~[storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$1.run(Utils.java:392) ~[storm-client-2.1.0.jar:2.1.0]
    ... 1 more
2021-06-09 05:58:39.993 o.a.s.u.Utils Thread-15-kafka_fennec-executor[5,5] [ERROR] Halting process: Worker died
java.lang.RuntimeException: Halting process: Worker died
    at org.apache.storm.utils.Utils.exitProcess(Utils.java:512) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$3.run(Utils.java:835) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.error.ReportErrorAndDie.uncaughtException(ReportErrorAndDie.java:41) [storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) [?:1.8.0_275]2021-06-09 05:58:26.288 o.a.k.c.c.i.Fetcher Thread-15-kafka_fennec-executor[5,5] [ERROR] Halting process: Worker died
java.lang.RuntimeException: Halting process: Worker died
    at org.apache.storm.utils.Utils.exitProcess(Utils.java:512) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.utils.Utils$3.run(Utils.java:835) [storm-client-2.1.0.jar:2.1.0]
    at org.apache.storm.executor.error.ReportErrorAndDie.uncaughtException(ReportErrorAndDie.java:41) [storm-client-2.1.0.jar:2.1.0]
    at java.lang.Thread.dispatchUncaughtException(Thread.java:1959) [?:1.8.0_275]

Kafka spout 配置:

KafkaspoutConfig kafkaConfiguration = KafkaspoutConfig.builder(bootstrapServers,topic)
    .setProp(ConsumerConfig.GROUP_ID_CONfig,groupId)
    .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,ByteArrayDeserializer.class.getName())
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,ByteArrayDeserializer.class.getName())
    .setoffsetCommitPeriodMs(10_000)
    .setRetry(getRetryService())
    .setRecordTranslator(
        producerRecord -> new Values(producerRecord.value()),new Fields("capnp")
    )
    .build();

return new Kafkaspout(kafkaConfiguration);

将消息保证设置为 NONE,可解决此问题。但正如日志所说,它不应该发生。减少提交周期时间和最大未提交消息似乎并不能解决这个问题。 https://issues.apache.org/jira/browse/STORM-2666 这似乎是同样的问题,但是我们正在运行不应该发生这种情况的修补版本。有人知道如何正确解决这个问题吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。