微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么我的 Spring MVCTomcat NIO、RestHighLevelClient在负载测试中的表现优于 WebfluxNetty、ReactiveElasticsearchClient?

如何解决为什么我的 Spring MVCTomcat NIO、RestHighLevelClient在负载测试中的表现优于 WebfluxNetty、ReactiveElasticsearchClient?

我是响应式编程和 webflux 的新手,我正在评估从使用 WebMVC 的 Servlet Stack 上的 Spring Data Elasticsearch 应用程序迁移到使用 Spring Webflux 的响应式堆栈。

我开发了两个相同的简单 Spring Boot 应用程序,它们可以使用 Spring Data Elasticsearch Repositories 执行 CRUD 操作。

测试是用户索引/保存文档到 Elasticsearch。 然后负载测试是 500 个并发用户,启动时间 20 秒,每个用户 20 次迭代(总共 10000 个文档)

我期待 Netty 上的 Webflux 优于 Tomcat 上的 MVC,尤其是在并发用户较多的情况下。但结果恰恰相反。 Netty 的响应时间几乎是 tomcat 的两倍,我需要增加响应式客户端中的 maxConnection 队列,因为我收到了 ReadTimeoutExceptions。 那么我做错了什么?

问题:

  1. Netty 上的 Webflux 是否应该能够更好地处理更多并发用户
  2. 为什么响应时间这么长? ...和 ​​netty 上的吞吐量较低?
  3. 我是否需要以不同方式配置反应式客户端以获得更好的性能
  4. 拥有数百个 NIO 线程的 Tomcat 能否处理更多请求并且比 Netty 事件循环更快?

应用程序具有以下堆栈:

Spring Web MVC:

<properties>
    <java.version>1.8</java.version>
    <spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
    <spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
    <spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
    <lombok.version>1.18.16</lombok.version>
    <jfairy.version>0.5.9</jfairy.version>
    <elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
  • Tomcat (spring-boot-starter-tomcat:jar:2.4.2:compile)

  • 用于对 Elasticsearch 的请求的 RestHighLevelClient

     @Configuration
     public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration {
    
     @Value("${elasticsearch.host:localhost}")
     private String host;
    
     @Value("${elasticsearch.http.port:9200}")
     private int port;
    
     @Override
     @Bean
     public RestHighLevelClient elasticsearchClient() {
    
         final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                 .connectedTo(getHostAndPort())
                 .build();
    
         return RestClients.create(clientConfiguration).rest();
     }
    
     private String getHostAndPort(){
         return host +":"+ port;
     }
     }
    

控制器:

@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {

    PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();

    return new ResponseEntity<>(dto,HttpStatus.CREATED);
}

服务:

public PersonDocumentDto indexGeneratedPersonDocument(){

    PersonDocument personDocument = personGenerator.generatePersonDoc();
    PersonDocumentDto personDocumentDto = new PersonDocumentDto();

    try {
        personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
        LOGGER.debug("Document indexed!");
    } catch (Exception e) {
        LOGGER.error("Unable to index document!",e);
    }

    return personDocumentDto;

}

Spring Webflux:

<properties>
    <java.version>1.8</java.version>
    <spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
    <spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
    <spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
    <reactor-test.version>3.4.2</reactor-test.version>
    <lombok.version>1.18.16</lombok.version>
    <jfairy.version>0.5.9</jfairy.version>
    <elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
  • Netty (spring-boot-starter-reactor-netty:jar:2.4.2:compile )

  • 用于对 Elasticsearch 的请求的 ReactiveElasticSearchClient

     @Configuration
     public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration {
    
     @Value("${elasticsearch.host:localhost}")
     private String host;
    
     @Value("${elasticsearch.http.port:9200}")
     private int port;
    
     @Override
     @Bean
     public ReactiveElasticsearchClient reactiveElasticsearchClient() {
         ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                 .connectedTo(getHostAndPort())
                 .withWebClientConfigurer(webClient -> {
                     ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
                             .codecs(configurer -> configurer.defaultCodecs()
                                     .maxInMemorySize(-1))
                             .build();
                     String connectionProviderName = "myConnectionProvider";
                     int maxConnections = 1000;
                     HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName,maxConnections));
    
                     return webClient
                             .mutate()
                             .clientConnector(new ReactorClientHttpConnector(httpClient))
                             .exchangeStrategies(exchangeStrategies)
                             .build();
                 })
                 .build();
    
         return ReactiveRestClients.create(clientConfiguration);
     }
    
     private String getHostAndPort(){
         return host +":"+ port;
     }
    

    }

处理程序:

public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
    return this.service.indexGeneratedPersonDocument()
            .flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
            .onErrorResume(WebClientRequestException.class,e -> ServerResponse
                    .badRequest()
                    .bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));

}

服务:

public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){

    return personGenerator.generatePersonDocument()
            .flatMap(this.repository::save)
            .map(EntityDtoUtil::toDto)
            .doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}

MVC 响应时间百分比: 500 users,20 iterations,10000 docs total

Webflux 响应时间百分比: 500 users,10000 docs total

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。