如何解决Spring Integration Flow DSL 与 SQS 和 Reactive
如何使用 DSL 为以下步骤设置反应式流程:
- 使用
SqsMessageDrivenChannelAdapter
接收 SQS 消息 - 验证 Json 消息 [
JsonSchemaValidator
类与validate
方法] - 将json转换为对象
- 将对象传递给服务激活器(
BusinessService
:业务逻辑、状态机) - 保留对象 R2DBC 出站适配器
在上面的示例中,创建了返回 Publisher
的专用流,并且在测试中发布者为 subscribed
。但是,当 SqsMessageDrivenChannelAdapter
将消息引入频道时,我的流程将被触发。
对于上述步骤 1 到 5 的场景,如何实现反应式流配置?
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload,messageHeaders) -> jsonSchemaValidator.validate(payload.toString()))
.transform(Transformers.fromJson(Entity.class))
.handle((payload,messageHeaders) ->businessService.process((Entity) payload))
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),ConsumerEndpointSpec::transactional)
.get();
}
@Bean
public MessageProducer sqsMessageDrivenChannelAdapter() {
SqsMessageDrivenChannelAdapter sqsMessageDrivenChannelAdapter =
new SqsMessageDrivenChannelAdapter(asyncSqsClient,queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setoutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
更新 2:使用执行器通道将 JPA 移动到差异线程
@Bean
public IntegrationFlow importFlow() {
IntegrationFlows.from(sqsInboundChannel())
.handle((payload,messageHeaders) ->businessService.process((Entity) payload))
.channel(persistChannel())
.handle(
Jpa.outboundAdapter(this.entityManagerFactory)
.entityClass(Entity)
.persistMode(PersistMode.PERSIST),queueName);
sqsMessageDrivenChannelAdapter.setAutoStartup(true);
sqsMessageDrivenChannelAdapter.setoutputChannel(sqsInboundChannel());
return sqsMessageDrivenChannelAdapter;
}
@Bean
public MessageChannel sqsInboundChannel() {
return MessageChannels.flux().get();
}
@Bean
public MessageChannel persistChannel() {
return MessageChannels.executor(Executors.newCachedThreadPool()).get();
}
解决方法
您可能需要让自己更加熟悉我们目前在 Spring Integration 中对 Reactive Streams 的了解:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams
您使用该测试类展示的示例与您的用例完全无关。在那个测试中,我们尝试覆盖我们在 Spring Integration 中公开的一些 API,有点像单元测试。它与整个流程无关。
您的用例实际上只是一个从 SQS 侦听器开始到 R2DBC 结束的完整黑盒流程。因此,您的流程中没有必要尝试将其一部分转换为 Publisher
,然后将其带回流程的另一部分:您不会跟踪某些方式并订阅该 {{1} } 自己。
您可以考虑在流程中的端点之间放置一个 Publisher
,但这对您的用例仍然没有意义。它不会像您期望的那样完全响应,因为 FluxMessageChannel
没有在使用者线程上阻塞以准备好接受来自下游的背压。
您的流程中唯一真正具有反应性的部分是 R2DBC 出站通道适配器,但它可能不会为您带来太多价值,因为数据源不是反应性的。
正如我所说:您可以尝试在 org.springframework.cloud.aws.messaging.listener.SimpleMessageListenerContainer
定义之后放置一个 channel(channels -> channels.flux())
以从该点开始反应流。同时,您应该尝试将 SqsMessageDrivenChannelAdapter
设置为 maxNumberOfMessages
以使其在从 SQS 提取下一条消息之前等待空闲空间。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。