微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Consumer.committableSource在连接到Kafka集群时如何处理错误

如何解决Consumer.committableSource在连接到Kafka集群时如何处理错误

我正在尝试了解Apache Kafka Consumer.committableSource在几种错误情况下的行为。我目前要解决的情况是,我使用连接到的Kafka群集启动应用程序。 Kafka群集最终将再次恢复运行,现在我希望应用程序能够检测到它,重新建立连接,从而恢复操作。

到目前为止,我已经进行了测试,我相信committableSource已经实现了一些这种逻辑,因为如果我在不连接到集群的情况下启动应用程序,并且几分钟后,我启动了集群,启动集群后不久,应用程序将根据需要使用消息。 我发现这很奇怪,因为存在RestartSource.onFailuresWithBackoff这个概念,所以这是我的第一个问题:这种行为是否得到保证?我已经仔细阅读了文档,但并没有能够找到有关在这种情况下预期结果的任何参考。

但是,我一直在苦苦挣扎的事实是,我无法捕获任何表明这些连接正在重试或保持的异常或日志消息。

这是我正在运行的代码的简化版本

var actorSystem = ActorSystem.create(Behaviors.empty(),"X");
var classicActorSystem = toClassic(actorSystem);

ConsumerSettings<String,String> kafkaConsumerSettings =
ConsumerSettings.create(classicActorSystem,new StringDeserializer(),new StringDeserializer())
    .withBootstrapServers(config.getSourceBootstrapServers())
    .withGroupId(config.getGroupId())
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest")
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,"false")
    .withStopTimeout(Duration.ofSeconds(5))
    .withMetadataRequestTimeout(Duration.ofSeconds(5))
    .withPollTimeout(Duration.ofSeconds(5));

var control = Consumer.committableSource(kafkaConsumerSettings,Subscriptions.topics(config.getSourcetopics()))
        .map(this::mapMessages)
        .runWith(Sink.ignore(),actorSystem);

control.thenAccept(s -> System.out.println("Consuming finished"));

我已经在application.conf中全局激活了日志

akka {
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  log-config-on-start = on
  stream {
    materializer {
      debug-logging = on
    }
  }
}

并尝试修改代码,并对其进行补充

Function<Throwable,Supervision.Directive> decider =
    exc -> {
        System.out.println("Decider Log");
        return (Supervision.Directive) Supervision.resume();
    };

//----------

.withAttributes(ActorAttributes.withSupervisionStrategy(decider))

我很奇怪该框架没有内置此日志记录,但是尽管进行了这些尝试和其他尝试,但我仍然无法访问它,但是有人知道如何做吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?