微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 spring 云流功能进行条件路由

如何解决使用 spring 云流功能进行条件路由

在旧的命令式编程类型被弃用后,我遇到了一些问题。 我有两个微服务(一个作为发布者,另一个作为订阅者)并且以旧方式,使用注释 @StreamListener(target = "events",condition = "headers['type']=='consumerPermissionEvent'") 我能够有两个函数只侦听该记录,现在我不知道该怎么做.

我正在阅读所有文档 event routing 并尝试使用路由表达式,但两个消费者正在阅读所有记录。

一个微服务的应用yaml:

spring:
  cloud:
    stream:
      bindings:
        output: 
          destination: topicEvents

秒应用yaml为:

spring:
  cloud:
    function:
      routing-expression: headers['type']
      deFinition: consumerPermissionEvent;consumerApiEvent
    stream:
      bindings:
        consumerPermissionEvent-in-0:
          destination: topicUsers
        consumerApiEvent-in-0:
          destination: topicUsers

我从第一个这样的微服务发送:

@Autowired
private StreamBridge bridge;

public void send(PermissionEvent event){
    Message<PermissionEvent> message = MessageBuilder.withPayload(event)
            .setHeader("type","consumerPermissionEvent").build();
    bridge.send("output",message);
}

第二个微服务有两个消费者:

    @Bean
    public Consumer<Message<ApiEvent>> consumerApiEvent() {
        return e -> log.debug("READED API EVENT: {}",e.getPayload());
    }

    @Bean
    public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
        return e -> log.debug("READED PERMISSION EVENT: {}",e.getPayload());
    }

以及第二个微服务的输出日志:

[KafkaConsumerDestination{consumerDestinationName='topicUsers',partitions=1,dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers',dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)

知道怎么做吗?

提前致谢

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。