微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

自 Spring Cloud Stream 3.1 版本起已弃用 EnableBinding、Output、Input

如何解决自 Spring Cloud Stream 3.1 版本起已弃用 EnableBinding、Output、Input

自 3.1 版起,用于处理队列的主要 API 已弃用。 在课堂评论中它说:

已弃用 从 3.1 开始支持函数式编程模型

我在网上搜索了很多解决方案,但没有找到关于我应该如何迁移的可靠的 E2E 解释。

寻找以下示例:

  1. 从队列中读取
  2. 写入队列

如果有几种方法可以做到这一点(正如我在网上看到的那样),我很乐意为每个选项提供解释和典型用例。

解决方法

  1. 我假设您已经熟悉主要概念,并将专注于迁移。
  2. 我在演示代码中使用了 kotlin,以减少冗长

首先,一些可能有帮助的参考资料:

  • 这是最初的相关文档:link
  • 这是对新功能格式中命名方案的解释:link
  • 这是一个更详细的解释,包含一些更高级的场景:link

TL;DR

Spring 现在不再使用基于注解的配置,而是使用检测到的 Consumer/Function/Supplier bean 来为您定义流。

输入/消费者

而在您之前的代码如下所示:

interface BindableGradesChannel {
    @Input
    fun gradesChannel(): SubscribableChannel

    companion object {
        const val INPUT = "gradesChannel"
    }
}

用法类似于:

@Service
@EnableBinding(BindableGradesChannel::class)
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)
    
    @StreamListener(BindableScoresChannel.INPUT)
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

现在整个定义都无关紧要了,可以这样做:

@Service
class GradesListener {
    private val log = LoggerFactory.getLogger(GradesListener::class.java)

    @Bean
    fun gradesChannel(): Consumer<Grade> {
        return Consumer { listen(grade = it) }
    }
    
    fun listen(grade: Grade) {
        log.info("Received $grade")
        // do something
    }
}

注意 Consumer bean 如何替换 @StreamListener@Input

关于配置,如果之前为了配置你有一个 application.yml 看起来像这样:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

现在应该是这样:

spring:
  cloud:
    stream:
      bindings:
        gradesChannel-in-0:
          destination: GradesExchange
          group: grades-updates
          consumer:
            concurrency: 10
            max-attempts: 3

注意 gradesChannel 是如何被 gradesChannel-in-0 替换的 - 要了解完整的命名约定,请参阅顶部的命名约定链接。

一些细节:

  1. 如果您的应用程序中有多个这样的 bean,则需要定义 spring.cloud.function.definition 属性。
  2. 您可以选择为频道指定自定义名称,因此如果您想继续使用 gradesChannel,您可以设置 spring.cloud.stream.function.bindings.gradesChannel-in-0=gradesChannel 并在配置 gradesChannel 中的任何地方使用。立>

输出/供应商

这里的概念是相似的,你将配置和代码替换成这样:

interface BindableStudentsChannel {
    @Output
    fun studentsChannel(): MessageChannel
}

@Service
@EnableBinding(BindableStudentsChannel::class)
class StudentsQueueWriter(private val studentsChannel: BindableStudentsChannel) {
    fun publish(message: Message<Student>) {
        studentsChannel.studentsChannel().send(message)
    }
}

现在可以替换为:

@Service
class StudentsQueueWriter {
    @Bean
    fun studentsChannel(): Supplier<Student> {
        return Supplier { Student("Adam") }
    }
}

如您所见,我们之间存在重大差异 - 何时调用以及由谁调用?

之前我们可以手动触发它,但现在它是由 spring 每秒触发的(默认情况下)。这适用于需要每秒发布传感器数据的用例,但是当您想在事件上发送消息时这并不好。除了出于某种原因使用 Function 之外,spring 还提供了两种选择:

StreamBridge - link

使用 StreamBridge 可以。像这样明确定义目标:

@Service
class StudentsQueueWriter(private val streamBridge: StreamBridge) {
    fun publish(message: Message<Student>) {
        streamBridge.send("studentsChannel-out-0",message)
    }
}

这样你就不会将目标通道定义为一个bean,但你仍然可以发送消息。缺点是您的类中有一些显式配置。

反应器 API - link

另一种方式是使用某种反应机制,例如EmitterProcessor,并返回它。使用此代码,您的代码将类似于:

@Service
class StudentsQueueWriter {
    val students: EmitterProcessor<Student> = EmitterProcessor.create()
    @Bean
    fun studentsChannel(): Supplier<Flux<Student>> {
        return Supplier { students }
    }
}

并且用法可能类似于:

class MyClass(val studentsQueueWriter: StudentsQueueWriter) {
    fun newStudent() {
        studentsQueueWriter.students.onNext(Student("Adam"))
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。