如何解决apache HttpClients 的奇怪瓶颈同步和异步
我正在尝试测试 apache http 客户端库的限制,但遇到了一个奇怪的瓶颈。我的测试包括以下内容:
-
模拟固定 80 毫秒延迟的线模型服务器
-
一个单元测试,它使用
org.apache.http.impl.nio.client.CloseableHttpAsyncClient
尽可能快地向wiremock 服务器发出可配置数量的请求,同时收集统计数据。 -
一个单元测试,它使用
org.apache.http.impl.client.CloseableHttpClient
从可配置数量的线程中尽可能快地向 Wiremock 服务器发出可配置数量的请求,同时收集统计数据。 -
一个单元测试,它使用
org.springframework.web.reactive.function.client.WebClient
尽可能快地向wiremock 服务器发出可配置数量的请求,同时收集统计数据。
所有测试在我的本地机器上都显示了相同的性能数据,每秒 570 个请求。运行这些测试时 CPU 非常低,利用率约为 5%。所以我可以假设瓶颈不在 CPU 中,而是在其他地方......
我的问题是这个瓶颈在哪里,我们如何扩大它?
我的系统配置:
- 处理器:3.1 GHz 四核 Intel Core i7
- 内存:16 GB 2133 MHz LPDDR3
- 操作系统:OSX 10.15.5
- Java 版本:11.0.4
我的单元测试:
package com.blakeparmeter.bottleneck_mystery;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
/**
* Used to illustrate a performance bottleneck with the apache HttpClients
*/
public class BottleneckTest {
// Test variables
final int totalTests = 5_000;
final long messageInterval = 1000; //ms
final URI testUri = URI.create("http://localhost:5000/wait/fixed/empty");
@Test
public void testSync() throws InterruptedException,ExecutionException {
final int numThreads = 100;
// Creates the sync client (unit under test)
final CloseableHttpClient unitUnderTest = HttpClientBuilder
.create()
.setMaxConnTotal(5000)
.setMaxConnPerRoute(5000)
.build();
// Run the test on an executor,send results to a stats aggregator
final ForkJoinPool executor = new ForkJoinPool(numThreads);
final StatsAggregator statsAggregator = new StatsAggregator(totalTests,messageInterval);
executor.submit(() -> IntStream.range(0,numThreads)
.parallel()
.forEach(threadNum -> IntStream.range(0,totalTests / numThreads).forEach(testNum -> {
final long runStart = System.currentTimeMillis();
try (final CloseableHttpResponse response = unitUnderTest.execute(new HttpGet(testUri))) {
// we don't need to do anything with the response,just make sure it's sent.
} catch (final IOException e) {
Assertions.fail(e);
}
statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
})))
.get();
// print the stats one last time (await is not needed since we wait on the executor)
statsAggregator.printStats();
}
@Test
public void testAsync() throws InterruptedException {
// Creates the async client (unit under test)
final CloseableHttpAsyncClient unitUnderTest = HttpAsyncClients.custom()
.setMaxConnTotal(5000)
.setMaxConnPerRoute(5000)
.build();
unitUnderTest.start();
// Runs all of the tests,sends results to a stats aggregator
final CountDownLatch testCountdown = new CountDownLatch(totalTests);
final StatsAggregator statsAggregator = new StatsAggregator(totalTests,messageInterval);
IntStream.range(0,totalTests).forEach(testNum -> {
final long runStart = System.currentTimeMillis();
unitUnderTest.execute(new HttpGet(testUri),new FutureCallback<>() {
@Override
public void completed(final HttpResponse response) {
statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
testCountdown.countDown();
}
@Override
public void failed(final Exception ex) {
Assertions.fail(ex.getMessage());
}
@Override
public void cancelled() {
Assertions.fail("Http Request Cancelled");
}
});
});
// await execution then print the stats one last time
testCountdown.await();
statsAggregator.printStats();
}
@Test
public void testReactive() {
final WebClient unitUnderTest = WebClient.builder().build();
// Runs all of the tests,sends results to a stats aggregator
final StatsAggregator statsAggregator = new StatsAggregator(totalTests,messageInterval);
Flux.range(0,totalTests)
.flatMap(testNum -> {
final long runStart = System.currentTimeMillis();
return unitUnderTest.get()
.uri(testUri)
.retrieve()
.bodyToMono(Object.class)
.doOnSuccess(obj -> statsAggregator.addTestDuration(System.currentTimeMillis() - runStart));
})
.then()
.block();
// print the stats one last time
statsAggregator.printStats();
}
}
StatsAggregator.java:
package com.blakeparmeter.bottleneck_mystery;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
/**
* @author Blake L. Parmeter
*/
public class StatsAggregator {
private static final DecimalFormat testCompleteFormat = new DecimalFormat("###,###,###");
private static final DecimalFormat avgRequestFormat = new DecimalFormat("###,###.##");
private final long start = System.currentTimeMillis();
private final List<Long> times;
private final TimerTask renderStatisticsTask;
// Creates a timer task to calculate and render runtime stats in realtime.
public StatsAggregator(final int totalTests,final long messageIntervalMillis) {
this.times = new ArrayList<>(totalTests);
renderStatisticsTask = new TimerTask() {
private Long lastLogTime = null;
private Integer lastLogSize = null;
@Override
public void run() {
// Init variables needed for calculations
final long logTime = System.currentTimeMillis();
final List<Long> statsCopy;
synchronized (times) {
if (!times.isEmpty()) {
statsCopy = new ArrayList<>(times);
} else {
System.out.println("No statistics have been loaded. Statistics will not be calculated.");
return;
}
}
Collections.sort(statsCopy);
// print execution completion status
System.out.println();
final double percentComplete = ((double) statsCopy.size() / (double) totalTests);
final long runtime = logTime - start; //ms
final double estimatedTimeRemaining = ((double) runtime / percentComplete) - (double) runtime; //ms
System.out.println(testCompleteFormat.format(statsCopy.size())
+ "\tTests completed of:"
+ testCompleteFormat.format(totalTests)
+ "\t"
+ avgRequestFormat.format(percentComplete * 100)
+ "% complete. "
+ "Running for: "
+ runtime / 1000d
+ " seconds. "
+ "Estimated Time remaining: "
+ testCompleteFormat.format(estimatedTimeRemaining / 1000d)
+ " seconds.");
// print running average requests / second
String sinceLastLogStats = "";
if (lastLogSize != null && lastLogTime != null) {
double numSinceLastLog = (double) statsCopy.size() - lastLogSize;
double timeSinceLastLog = (double) logTime - lastLogTime;
double avgReqPerSecSinceLastLogSec = 1000 * (numSinceLastLog / timeSinceLastLog);
sinceLastLogStats = "\tavg req/sec:"
+ avgRequestFormat.format(avgReqPerSecSinceLastLogSec)
+ "(since last run)";
}
lastLogSize = statsCopy.size();
lastLogTime = logTime;
double avgReqPerSec = 1000 * ((double) statsCopy.size() / (double) (logTime - start));
System.out.println("\tavg req/sec:"
+ avgRequestFormat.format(avgReqPerSec)
+ "(total)"
+ sinceLastLogStats);
// print average min and max
double avg = (double) statsCopy.stream().reduce(Long::sum).orElseThrow() / (double) statsCopy.size();
System.out.println("\tavg:" + avgRequestFormat.format(avg) +
"\tmin:" + statsCopy.get(0) +
"\tmax:" + statsCopy.get(statsCopy.size() - 1));
// print percentiles
System.out.println("\tRequest duration percentiles:\n" +
"\t\t1%:" + percentile(statsCopy,1) +
"\t5%:" + percentile(statsCopy,5) +
"\t10%:" + percentile(statsCopy,10) +
"\t50%:" + percentile(statsCopy,50) +
"\t90%:" + percentile(statsCopy,90) +
"\t95%:" + percentile(statsCopy,95) +
"\t99%:" + percentile(statsCopy,99) +
"\t99.9%:" + percentile(statsCopy,99.9) +
"\t99.99%:" + percentile(statsCopy,99.99));
System.out.println("\tCalculations took:" + (System.currentTimeMillis() - logTime) + "ms.");
}
};
// Schedule printing of statistics on a timer
final Timer timer = new Timer("test-output-timer",true);
timer.schedule(renderStatisticsTask,messageIntervalMillis,messageIntervalMillis);
}
public void printStats() {
renderStatisticsTask.run();
}
public void addTestDuration(final long time) {
times.add(time);
}
private static long percentile(List<Long> times,double percentile) {
int index = (int) Math.ceil(percentile / 100.0 * times.size());
return times.get(index - 1);
}
}
为此测试运行线模拟:
- 从此处下载独立 jar:http://wiremock.org/docs/running-standalone/ 到一个目录,在这些说明中将称为
<wiremock_directory>
。下载的文件将被称为<wiremock.jar>
- 创建目录
<wiremock_directory>/mappings
- 创建一个名为 endpoint.json 的文件并将其放入
<wiremock_directory>/mappings
{
"request": {
"method": "GET","urlPathPattern": "/wait/fixed/empty"
},"response": {
"headers": {
"Content-Type": "application/json;charset=UTF-8"
},"status": 200,"fixedDelayMilliseconds": 80
}
}
- 使用以下命令以可以处理高并发的模式运行wiremock:
java -jar <wiremock.jar> --port 5000 --container-threads 250 --jetty-acceptor-threads 200 --no-request-journal
注意:用下载的文件替换命令中的文件名。
解决方法
如果您无法最大化 CPU 的内核数,则意味着您的线程正在等待。最有可能是 IO。当您拨打网络电话时。一般来说,一旦 IO 发挥作用,您就不太可能创建 CPU 绑定负载。要了解您的线程在做什么,请在您的测试运行时创建一个线程转储并查看它们的堆栈跟踪。
还有几件事:
- 如果您不创建旨在窃取工作的任务,最好不要使用
ForkJoinPool
。 IE。ForkJoinTask
秒。在您的情况下,一个简单的ThreadPoolExecutor
可能会表现得更好。 - 您可能会用平行流射击自己的膝盖。您创建了一个具有许多线程的执行程序,每个线程都会收到一个
Runnable
来执行,该线程使用并行引擎,而并行引擎又使用一个共享的 ForkJoinPool,该线程的线程数与逻辑 CPU 内核的数量一样多。如果您有 8 个逻辑核心,则意味着您有 100 个线程驱动任务,这些任务都共享一个 8 线程池来完成它们的工作。 (不过,如果是这样的话,如果我的数学计算正确的话,您每秒应该只能收到大约 100 个请求。) - 收集每个请求的时间也可能代价高昂。我不想每次请求都调用
System.currentTimeMillis()
两次。 (不过,与 http 客户端执行的网络请求的成本相比,该成本很可能可以忽略不计。)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。