微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spring Cloud Stream 函数Router 输出尝试绑定到Kafka 主题

如何解决Spring Cloud Stream 函数Router 输出尝试绑定到Kafka 主题

我正在尝试迁移到 Spring Cloud Stream 的新函数式编程模型,替换像这样的条件 StreamListener 注释

@StreamListener("app-input",condition = "headers['eventName']=='Funded'")

类似的东西

@Bean
fun router() = MessageRoutingCallback {
    when (it.headers["eventName"]) {
        "Funded" -> "funded"
        else -> "ignored"
    }
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
    ...
}

@Bean
fun ignored() = Consumer { message: Message<*> ->
    ...
}

具有将频道链接主题的相关属性

spring.cloud.function.deFinition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic

我需要这种间接级别,因为有多种 Avro 消息类型都到达 MyTopic,需要反序列化并以不同方式路由

这一切都运行得很愉快,我可以按预期消费和路由消息。然而,以这种方式使用 functionRouter 有一个意想不到的和不需要的副作用,那就是当没有可用的主题时,它也会尝试将 functionRouter-out-0 绑定到 Kafka,因此应用程序每 30 秒尝试附加到代理上的一个名为“functionRouter-out-0”的主题,如您所料,因授权错误而失败。

2021-05-06 12:57:55.654 WARN  [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN  [screening]                            org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-3] Error while fetching Metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_Failed}
2021-05-06 12:57:56.199 ERROR [screening]                            org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Topic authorization Failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]

所以问题是 a) 如何阻止 functionRouter-out-0 通道尝试绑定到 Kafka,或者 b) 我如何在不需要中间通道的情况下实现这一点?

Spring Cloud Stream event routing functionality autocreates new topic 类似,但从未收到答复。

解决方法

我相信这是一个错误。如果你想关注它,我打开了一个问题:

https://github.com/spring-cloud/spring-cloud-stream/issues/2168

作为一种解决方法,只需将其指向同一个目的地

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839

由于代表都是 Consumer,我们实际上永远不会发送任何东西。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?