微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

pubSubReactiveFactory错误时重新启动无限通量

如何解决pubSubReactiveFactory错误时重新启动无限通量

我正在开发一个使用反应堆库与Google pubsub连接的应用程序。所以我有大量的信息。无论发生什么情况,我都希望它始终从队列中使用:这意味着要处理所有错误,以免终止通量。我正在考虑(极不可能)与pubsub的连接丢失或可能导致刚创建的Flux发出错误信号的事件。我想出了这个解决方案:


    private final PubSubReactiveFactory pubSubReactiveFactory;
    private final String requestSubscription;
    private final Long requestPollTime;
    private final Flux<AckNowledgeablePubsubMessage> requestFlux;

    @Autowired
    public FluxContainer(/* Field args...*/) {
        // init stuff...
        this.requestFlux = initRequestFlux();
    }

    private Flux<AckNowledgeablePubsubMessage> initRequestFlux() {
        return pubSubReactiveFactory.poll(requestSubscription,requestPollTime);
                .doOnError(e -> log.error("Fatal error: Could not retrieve message from queue. Resetting flux",e))
                .onErrorResume(e -> initRequestFlux());
    }

    @EventListener(ApplicationReadyEvent.class)
    public void configureFluxAndSubscribe() {
        log.info("Setting up requestFlux...");
        this.requestFlux
                .doOnNext(AckNowledgeablePubsubMessage::ack)
                // ...many more concatenated calls handling flux
    }

这有意义吗?我担心内存分配(我依靠gc清理东西)。欢迎任何评论

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。