如何解决如何在反应堆中有条件地重复或重试
我使用SpringBoot和Webflux进行反应式编程。我想重复该服务,直到有可用数据为止(除了null之外,还会返回某些内容)
我有一个将一些数据插入数据库的服务,还有第二个使用该数据的服务。 我想继续从第二个服务查询数据库,直到有数据可用为止。下面的代码我试图使用Project Reactor来实现:
Mono<SubscriptionQueryResult<App,App>> subscriptionQuery = reactiveQueryGateway
.subscriptionQuery(new FindAppByIdQuery(appId),ResponseTypes.instanceOf(App.class),ResponseTypes.instanceOf(App.class));
subscriptionQuery
.filter(a -> Objects.nonNull(a.initialResult().block()))
.repeatWhen(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofMillis(100),Duration.ofSeconds(100))
.timeout(Duration.ofSeconds(30))).subscribe();
执行此操作时,出现以下异常:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking,which is not supported in thread parallel-1
在浏览webflux文档时,我发现在Reactor线程中无法调用block()函数。这样的尝试会导致上述错误:
要克服我在下面尝试过的问题,
subscriptionQuery
.flatMap(a -> a.initialResult())
.filter(a -> Objects.nonNull(a))
.repeatWhen(Repeat.onlyIf(repeatContext -> true)
.exponentialBackoff(Duration.ofMillis(100),Duration.ofSeconds(100))
.timeout(Duration.ofSeconds(30)))
.subscribe();
但是它没有给我想要的结果,我想我错过了一些东西。任何人都可以建议实现此目标的正确方法。
谢谢。
解决方法
让我尝试在这个问题上为您提供帮助。
实际上,解决的最佳方法是在发送命令之前先对其进行订阅。 这样,您就知道订阅查询何时发出更新。
我们有一个code-samples可以为您提供帮助。
要对此进行扩展,您最感兴趣的部分应该是CommandController上的该部分:
public <U> Mono<U> sendAndReturnUpdate(Object command,SubscriptionQueryResult<?,U> result) {
/* The trick here is to subscribe to initial results first,even it does not return any result
Subscribing to initialResult creates a buffer for updates,even that we didn't subscribe for updates yet
they will wait for us in buffer,after this we can safely send command,and then subscribe to updates */
return Mono.when(result.initialResult())
.then(Mono.fromCompletionStage(() -> commandGateway.send(command)))
.thenMany(result.updates())
.timeout(Duration.ofSeconds(5))
.next()
.doFinally(unused -> result.cancel());
/* dont forget to close subscription query on the end and add a timeout */
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。