微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

即使 AckMode 为 MANUAL_IMMEDIATE 并且消费者应用程序在调用acknowledgement.acknowledge() 之前停止,Spring-Kafka 偏移量也会被提交;

如何解决即使 AckMode 为 MANUAL_IMMEDIATE 并且消费者应用程序在调用acknowledgement.acknowledge() 之前停止,Spring-Kafka 偏移量也会被提交;

我正在使用 spring-kafka 运行 kafka 消费者服务。我已将 enable.auto.commit 设置为 false 并将 AckMode 设置为 MANUAL_IMMEDIATE。还使用并发等于 2 的 ConcurrentKafkaListenerContainerFactory。(我主题中的分区数)

现在我正在收听我的主题,我正在调试模式下运行我的应用程序,一旦我在我的 KafkaListener 方法中收到第一条消息,我就会在调用 AckNowledge.ackNowledge() 之前强行停止我的应用程序。

KafkaConfig class code :

  /**
   * create consumer factory for core user event consumer.
   */
  @Bean
  public ConsumerFactory<String,String> userConsumerFactory() {

    Map<String,Object> props = consumerConfigs();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,eventStreamingProperties.getEventhubUsersEventBootstrapServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class.getName());
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONfig,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,false);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
    props.put(ConsumerConfig.GROUP_ID_CONfig,"$Default");
    if (eventStreamingProperties.isEventhubUsersEventSecurityEnabled()) {
      props.put(
          CommonClientConfigs.Security_PROTOCOL_CONfig,"SASL_SSL");
      props.put(
          "sasl.mechanism","PLAIN");
      props.put(
          "sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required "
              + "username=\"$ConnectionString\" password=\"" + connectionStringForUserEvents + "\";");
    }

    return new DefaultKafkaConsumerFactory<>(props);
  }

  /**
   * create kafka listener container factory for core user event consumer.
   */
  @Bean
  public KafkaListenerContainerFactory kafkaUserListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String,String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(userConsumerFactory());
    factory.setAutoStartup(true);
    factory.getContainerProperties().setSyncCommits(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler((record,exception) -> {
    },new FixedBackOff(retryInterval,retryMaxAttempts)));
    return factory;
  }

Listner Code :

  @KafkaListener(
      topics = "${events.user.topic}",concurrency = "${events.user.concurrency}",containerFactory = "kafkaUserListenerContainerFactory")
  public void processEvent(ConsumerRecord<String,String> record,AckNowledgment ackNowledgment,@Header("kafka_offset") int offSet) {
    log.info(String.format("In UserEventListener: processing the Event for user"
            + "migration  Key : %s,Partition : %s,Topic : %s"
            + ",Headers : %s",record.key(),record.partition(),record.topic(),record.headers().toString()));
    coreUserEventConsumer.validateAndConsumeEvent(record);
    ackNowledgment.ackNowledge();
  }

调用ackNowledgement.ackNowledge() 方法之前,我停止了在调试模式下运行的应用程序。当我重新启动我的应用程序时,它不会读取消息。所以我应该在这里做什么请帮忙。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。