微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java反应器重试时间和超时

如何解决java反应器重试时间和超时


        Flux<Integer> flux2 = Flux.generate(AtomicInteger::new,(atomicInteger,synchronousSink) -> {
            if (atomicInteger.get() == 10) {
                synchronousSink.complete();
                return atomicInteger;
            }
            synchronousSink.next(atomicInteger.getAndIncrement());

            return atomicInteger;
        }).cast(Integer.class)
                .map(e -> {
                    if (e != 3) return e;
                    else {
                        try {
                            Thread.sleep(510L);
                        } catch (InterruptedException interruptedException) {
                            throw new RuntimeException(interruptedException);
                        }
                        System.out.println("sleeping");
                        return e;
                    }
                })
                .timeout(Duration.ofMillis(400L))
                .doOnError(System.out::println)
                .retrywhen(Retry.max(2).transientErrors(false));

出于某种原因,此代码不会在第二次重试时触发超时错误,适用于 Flux.just(1,2,3,4) 等静态数据,但在第一次重试后生成器不起作用。

15:15:45.518 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
0
1
2
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
0
1
2
sleeping
15:15:46.129 [main] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 400ms in 'map' (and no fallback has been configured)
sleeping

15:15:46.549 [parallel-3] DEBUG reactor.core.publisher.Operators - onNextDropped: 3
0
1
2
sleeping
3
4
5
6
7
8
9

P.S 在订阅特定线程后,由于某种原因它起作用了。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。