微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka Streams 使用 Materialized.`as`(STORE_NAME) 失败

如何解决Kafka Streams 使用 Materialized.`as`(STORE_NAME) 失败

我正在创建一个流应用程序来丰富存储在表中的主题数据。一切正常,直到我添加 Materialized as api。任何人都知道为什么会发生这种情况?我在网上搜索了解释这一点的资源。

它总是在一个新的 kafka 实例上运行。

val source = builder.stream<String,Example>(INPUT)

val newValues = source
    .mapValues { key,value ->
        NewValue()
    }
    .toTable(Materialized.`as`(STORE_NAME))

如果我删除它,代码工作正常。

它抛出以下堆栈跟踪并终止流:

java.lang.IllegalStateException: Tried to lookup lag for unkNown task 0_1
at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$1311/000000000000000000.applyAsLong(UnkNown Source)
at java.base/java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511)
at java.base/java.util.Comparator$$Lambda$1312/000000000000000000.compare(UnkNown Source)
at java.base/java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216)
at java.base/java.util.Comparator$$Lambda$178/000000000000000000.compare(UnkNown Source)
at java.base/java.util.TreeMap.put(TreeMap.java:550)
at java.base/java.util.TreeSet.add(TreeSet.java:255)
at java.base/java.util.AbstractCollection.addAll(AbstractCollection.java:352)
at java.base/java.util.TreeSet.addAll(TreeSet.java:312)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPrevIoUsTasksByLag(StreamsPartitionAssignor.java:1265)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1179)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:930)
at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:394)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:590)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinleader(AbstractCoordinator.java:689)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1301)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)

编辑: 原来状态是在 testcontainer 测试的运行之间存储的。所以使用KafkaStreams#cleanup方法解决这个流状态冲突的问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。