微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

反应式 Kafka 接收器轮询新记录,即使旧记录也未确认

如何解决反应式 Kafka 接收器轮询新记录,即使旧记录也未确认

我正在尝试使用 Reactive Kafka Receiver 来处理 Kafka 消息。我希望消费者只有在我调查的最后一个被确认的情况下才会给我新消息。但是,即使我不运行 ack 我也会得到新的记录。这是一种理想的行为还是我缺少一些配置?

void consumeMessages() {
    kafkaConsumer(() -> true).subscribe();
}

Flux<Void> kafkaConsumer(Booleansupplier repeatCondition) {
    return reactiveConsumer
        .receive()
        .doOnError(error -> log.warn("Received error when get kafka message",error))
        .doOnNext(receivedRecord -> log.debug("Received event {}",receivedRecord.value()))
        .flatMap(this::handleReceivedRecord)
        .onErrorResume(error -> Flux.empty())
        .repeat(repeatCondition);
}


Flux<Void> handleReceivedRecord(ReceiverRecord<String,CustomEvent> receivedRecord) {
    return Mono.just(receivedRecord)
        .flatMapMany(
            record ->
                handleCustomEvent(record.value())
                    .doOnComplete(() -> {
                        log.debug("Committing offset for record: {}",record);
                        //record.receiverOffset().ackNowledge();
                    })
                    .onErrorMap(error -> new ReceiverRecordException(record,error)))
        .doOnError(error -> log.error("Error when processing CustomEvent",error))
        .retrywhen(
            Retry.backoff(consumerProperties.retries(),consumerProperties.minBackOff())
                .transientErrors(true))
        .onErrorResume(
            error -> {
                ReceiverRecordException ex = (ReceiverRecordException) error.getCause();
                log.warn("Retries exhausted for " + ex.getRecord().value());
                ex.getRecord().receiverOffset().ackNowledge();
                return Mono.empty();
            });
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。