如何解决为什么我的 Spring MVCTomcat NIO、RestHighLevelClient在负载测试中的表现优于 WebfluxNetty、ReactiveElasticsearchClient?
我是响应式编程和 webflux 的新手,我正在评估从使用 WebMVC 的 Servlet Stack 上的 Spring Data Elasticsearch 应用程序迁移到使用 Spring Webflux 的响应式堆栈。
我开发了两个相同的简单 Spring Boot 应用程序,它们可以使用 Spring Data Elasticsearch Repositories 执行 CRUD 操作。
测试是用户索引/保存文档到 Elasticsearch。 然后负载测试是 500 个并发用户,启动时间 20 秒,每个用户 20 次迭代(总共 10000 个文档)
我期待 Netty 上的 Webflux 优于 Tomcat 上的 MVC,尤其是在并发用户较多的情况下。但结果恰恰相反。 Netty 的响应时间几乎是 tomcat 的两倍,我需要增加响应式客户端中的 maxConnection 队列,因为我收到了 ReadTimeoutExceptions。 那么我做错了什么?
问题:
- Netty 上的 Webflux 是否应该能够更好地处理更多并发用户?
- 为什么响应时间这么长? ...和 netty 上的吞吐量较低?
- 我是否需要以不同方式配置反应式客户端以获得更好的性能?
- 拥有数百个 NIO 线程的 Tomcat 能否处理更多请求并且比 Netty 事件循环更快?
应用程序具有以下堆栈:
Spring Web MVC:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<spring-boot-starter-web.version>2.4.4</spring-boot-starter-web.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
-
Tomcat (spring-boot-starter-tomcat:jar:2.4.2:compile)
-
用于对 Elasticsearch 的请求的 RestHighLevelClient
@Configuration public class MvcElasticsearchConfiguration extends AbstractElasticsearchConfiguration { @Value("${elasticsearch.host:localhost}") private String host; @Value("${elasticsearch.http.port:9200}") private int port; @Override @Bean public RestHighLevelClient elasticsearchClient() { final ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo(getHostAndPort()) .build(); return RestClients.create(clientConfiguration).rest(); } private String getHostAndPort(){ return host +":"+ port; } }
控制器:
@PostMapping(value = "/index")
public ResponseEntity<PersonDocumentDto> indexGeneratedPersonDocument() {
PersonDocumentDto dto = this.service.indexGeneratedPersonDocument();
return new ResponseEntity<>(dto,HttpStatus.CREATED);
}
服务:
public PersonDocumentDto indexGeneratedPersonDocument(){
PersonDocument personDocument = personGenerator.generatePersonDoc();
PersonDocumentDto personDocumentDto = new PersonDocumentDto();
try {
personDocumentDto = EntityDtoUtil.toDto(this.repository.save(personDocument));
LOGGER.debug("Document indexed!");
} catch (Exception e) {
LOGGER.error("Unable to index document!",e);
}
return personDocumentDto;
}
Spring Webflux:
<properties>
<java.version>1.8</java.version>
<spring-boot-starter-data-elasticsearch.version>2.4.4</spring-boot-starter-data-elasticsearch.version>
<spring-boot-starter-webflux.version>2.4.4</spring-boot-starter-webflux.version>
<spring-boot-starter-test.version>2.4.4</spring-boot-starter-test.version>
<reactor-test.version>3.4.2</reactor-test.version>
<lombok.version>1.18.16</lombok.version>
<jfairy.version>0.5.9</jfairy.version>
<elasticsearch.version>7.12.0</elasticsearch.version>
</properties>
-
Netty (spring-boot-starter-reactor-netty:jar:2.4.2:compile )
-
用于对 Elasticsearch 的请求的 ReactiveElasticSearchClient
@Configuration public class ReactiveElasticsearchConfiguration extends AbstractReactiveElasticsearchConfiguration { @Value("${elasticsearch.host:localhost}") private String host; @Value("${elasticsearch.http.port:9200}") private int port; @Override @Bean public ReactiveElasticsearchClient reactiveElasticsearchClient() { ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo(getHostAndPort()) .withWebClientConfigurer(webClient -> { ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() .codecs(configurer -> configurer.defaultCodecs() .maxInMemorySize(-1)) .build(); String connectionProviderName = "myConnectionProvider"; int maxConnections = 1000; HttpClient httpClient = HttpClient.create(ConnectionProvider.create(connectionProviderName,maxConnections)); return webClient .mutate() .clientConnector(new ReactorClientHttpConnector(httpClient)) .exchangeStrategies(exchangeStrategies) .build(); }) .build(); return ReactiveRestClients.create(clientConfiguration); } private String getHostAndPort(){ return host +":"+ port; }
}
处理程序:
public Mono<ServerResponse> indexSingleGeneratedPersonDoc(ServerRequest serverRequest){
return this.service.indexGeneratedPersonDocument()
.flatMap(personDocumentDto -> ServerResponse.ok().bodyValue(personDocumentDto))
.onErrorResume(WebClientRequestException.class,e -> ServerResponse
.badRequest()
.bodyValue(Optional.ofNullable(e.getMessage()).orElseGet(() -> "Something went wrong!") ));
}
服务:
public Mono<PersonDocumentDto> indexGeneratedPersonDocument(){
return personGenerator.generatePersonDocument()
.flatMap(this.repository::save)
.map(EntityDtoUtil::toDto)
.doOnSuccess(response -> LOGGER.debug("Document indexed!"));
}
MVC 响应时间百分比: 500 users,20 iterations,10000 docs total
Webflux 响应时间百分比: 500 users,10000 docs total
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。