如何解决自 Spring Cloud Stream 3.1 版本起已弃用 EnableBinding、Output、Input
自 3.1 版起,用于处理队列的主要 API 已弃用。 在课堂评论中它说:
我在网上搜索了很多解决方案,但没有找到关于我应该如何迁移的可靠的 E2E 解释。
寻找以下示例:
- 从队列中读取
- 写入队列
如果有几种方法可以做到这一点(正如我在网上看到的那样),我很乐意为每个选项提供解释和典型用例。
解决方法
- 我假设您已经熟悉主要概念,并将专注于迁移。
- 我在演示代码中使用了 kotlin,以减少冗长
首先,一些可能有帮助的参考资料:
TL;DR
Spring 现在不再使用基于注解的配置,而是使用检测到的 Consumer
/Function
/Supplier
bean 来为您定义流。
输入/消费者
而在您之前的代码如下所示:
interface BindableGradesChannel {
@Input
fun gradesChannel(): SubscribableChannel
companion object {
const val INPUT = "gradesChannel"
}
}
用法类似于:
@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@StreamListener(BindableScoresChannel.INPUT)
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
现在整个定义都无关紧要了,可以这样做:
@Service
class GradesListener {
private val log = LoggerFactory.getLogger(GradesListener::class.java)
@Bean
fun gradesChannel(): Consumer<Grade> {
return Consumer { listen(grade = it) }
}
fun listen(grade: Grade) {
log.info("Received $grade")
// do something
}
}
注意 Consumer
bean 如何替换 @StreamListener
和 @Input
。
关于配置,如果之前为了配置你有一个 application.yml
看起来像这样:
spring:
cloud:
stream:
bindings:
gradesChannel:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
现在应该是这样:
spring:
cloud:
stream:
bindings:
gradesChannel-in-0:
destination: GradesExchange
group: grades-updates
consumer:
concurrency: 10
max-attempts: 3
注意 gradesChannel
是如何被 gradesChannel-in-0
替换的 - 要了解完整的命名约定,请参阅顶部的命名约定链接。
一些细节:
- 如果您的应用程序中有多个这样的 bean,则需要定义
spring.cloud.function.definition
属性。 - 您可以选择为频道指定自定义名称,因此如果您想继续使用
gradesChannel
,您可以设置spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel
并在配置gradesChannel
中的任何地方使用。立>
输出/供应商
这里的概念是相似的,你将配置和代码替换成这样:
interface BindableStudentsChannel {
@Output
fun studentsChannel(): MessageChannel
}
和
@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
fun publish(message: Message<Student>) {
studentsChannel.studentsChannel().send(message)
}
}
现在可以替换为:
@Service
class StudentsQueueWriter {
@Bean
fun studentsChannel(): Supplier<Student> {
return Supplier { Student("Adam") }
}
}
如您所见,我们之间存在重大差异 - 何时调用以及由谁调用?
之前我们可以手动触发它,但现在它是由 spring 每秒触发的(默认情况下)。这适用于需要每秒发布传感器数据的用例,但是当您想在事件上发送消息时这并不好。除了出于某种原因使用 Function
之外,spring 还提供了两种选择:
StreamBridge - link
使用 StreamBridge
可以。像这样明确定义目标:
@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
fun publish(message: Message<Student>) {
streamBridge.send("studentsChannel-out-0",message)
}
}
这样你就不会将目标通道定义为一个bean,但你仍然可以发送消息。缺点是您的类中有一些显式配置。
反应器 API - link
另一种方式是使用某种反应机制,例如EmitterProcessor
,并返回它。使用此代码,您的代码将类似于:
@Service
class StudentsQueueWriter {
val students: EmitterProcessor<Student> = EmitterProcessor.create()
@Bean
fun studentsChannel(): Supplier<Flux<Student>> {
return Supplier { students }
}
}
并且用法可能类似于:
class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
fun newStudent() {
studentsQueueWriter.students.onNext(Student("Adam"))
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。