微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Resilience4j 速率限制器在项目反应堆中无法正常工作?

如何解决Resilience4j 速率限制器在项目反应堆中无法正常工作?

我目前正在研究弹性 4j 库,但由于某种原因,以下代码无法按预期工作:

@Test
public void testRateLimiterProjectReactor()
{
    // The configuration below will allow 2 requests per second and a "timeout" of 2 seconds.
    RateLimiterConfig config = RateLimiterConfig.custom()
                                                .limitForPeriod(2)
                                                .limitRefreshPeriod(Duration.ofSeconds(1))
                                                .timeoutDuration(Duration.ofSeconds(2))
                                                .build();

    // Step 2.
    // Create a RateLimiter and use it.
    RateLimiterRegistry registry = RateLimiterRegistry.of(config);
    RateLimiter rateLimiter = registry.rateLimiter("myReactorServiceNameLimiter");

    // Step 3.
    Flux<Integer> flux = Flux.from(Flux.range(0,10))
                             .transformDeferred(RateLimiterOperator.of(rateLimiter))
                             .log()

        ;

    StepVerifier.create(flux)
                .expectNextCount(10)
                .expectComplete()
                .verify()
    ;
}

根据官方示例 herehere,这应该将 request() 限制为每秒 2 元素。但是,日志显示它正在立即获取所有元素:

15:08:24.587 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
15:08:24.619 [main] INFO reactor.Flux.Defer.1 - onSubscribe(RateLimiterSubscriber)
15:08:24.624 [main] INFO reactor.Flux.Defer.1 - request(unbounded)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(0)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(1)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(2)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(3)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(4)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(5)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(6)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(7)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(8)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onNext(9)
15:08:24.626 [main] INFO reactor.Flux.Defer.1 - onComplete()

我不明白出了什么问题?

解决方法

正如上面评论中已经回答的那样,RateLimiter 跟踪订阅数,而不是元素。要实现元素的速率限制,您可以使用 limitRate(和缓冲区 + delayElements)。 例如,

        Flux.range(1,100)
                .delayElements(Duration.ofMillis(100)) // to imitate a publisher that produces elements at a certain rate
                .log()
                .limitRate(10) // used to requests up to 10 elements from the publisher
                .buffer(10) // groups integers by 10 elements
                .delayElements(Duration.ofSeconds(2)) // emits a group of ints every 2 sec
                .subscribe(System.out::println);

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。