apache HttpClients 的奇怪瓶颈同步和异步

如何解决apache HttpClients 的奇怪瓶颈同步和异步

我正在尝试测试 apache http 客户端库的限制,但遇到了一个奇怪的瓶颈。我的测试包括以下内容:

  1. 模拟固定 80 毫秒延迟的线模型服务器

  2. 一个单元测试,它使用 org.apache.http.impl.nio.client.CloseableHttpAsyncClient 尽可能快地向wiremock 服务器发出可配置数量的请求,同时收集统计数据。

  3. 一个单元测试,它使用 org.apache.http.impl.client.CloseableHttpClient 从可配置数量的线程中尽可能快地向 Wiremock 服务器发出可配置数量的请求,同时收集统计数据。

  4. 一个单元测试,它使用 org.springframework.web.reactive.function.client.WebClient 尽可能快地向wiremock 服务器发出可配置数量的请求,同时收集统计数据。

所有测试在我的本地机器上都显示了相同的性能数据,每秒 570 个请求。运行这些测试时 CPU 非常低,利用率约为 5%。所以我可以假设瓶颈不在 CPU 中,而是在其他地方......

我的问题是这个瓶颈在哪里,我们如何扩大它?

我的系统配置:

  • 处理器:3.1 GHz 四核 Intel Core i7
  • 内存:16 GB 2133 MHz LPDDR3
  • 操作系统:OSX 10.15.5
  • Java 版本:11.0.4

我的单元测试:

package com.blakeparmeter.bottleneck_mystery;

import org.apache.http.HttpResponse;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;

/**
 * Used to illustrate a performance bottleneck with the apache HttpClients
 */
public class BottleneckTest {

    // Test variables
    final int totalTests = 5_000;
    final long messageInterval = 1000; //ms
    final URI testUri =  URI.create("http://localhost:5000/wait/fixed/empty");

    @Test
    public void testSync() throws InterruptedException,ExecutionException {

        final int numThreads = 100;

        // Creates the sync client (unit under test)
        final CloseableHttpClient unitUnderTest = HttpClientBuilder
                .create()
                .setMaxConnTotal(5000)
                .setMaxConnPerRoute(5000)
                .build();

        // Run the test on an executor,send results to a stats aggregator
        final ForkJoinPool executor = new ForkJoinPool(numThreads);
        final StatsAggregator statsAggregator = new StatsAggregator(totalTests,messageInterval);
        executor.submit(() -> IntStream.range(0,numThreads)
                .parallel()
                .forEach(threadNum -> IntStream.range(0,totalTests / numThreads).forEach(testNum -> {
                    final long runStart = System.currentTimeMillis();
                    try (final CloseableHttpResponse response = unitUnderTest.execute(new HttpGet(testUri))) {
                        // we don't need to do anything with the response,just make sure it's sent.
                    } catch (final IOException e) {
                        Assertions.fail(e);
                    }
                    statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
                })))
                .get();

        // print the stats one last time (await is not needed since we wait on the executor)
        statsAggregator.printStats();
    }

    @Test
    public void testAsync() throws InterruptedException {

        // Creates the async client (unit under test)
        final CloseableHttpAsyncClient unitUnderTest = HttpAsyncClients.custom()
                .setMaxConnTotal(5000)
                .setMaxConnPerRoute(5000)
                .build();
        unitUnderTest.start();

        // Runs all of the tests,sends results to a stats aggregator
        final CountDownLatch testCountdown = new CountDownLatch(totalTests);
        final StatsAggregator statsAggregator = new StatsAggregator(totalTests,messageInterval);
        IntStream.range(0,totalTests).forEach(testNum -> {

            final long runStart = System.currentTimeMillis();
            unitUnderTest.execute(new HttpGet(testUri),new FutureCallback<>() {

                @Override
                public void completed(final HttpResponse response) {
                    statsAggregator.addTestDuration(System.currentTimeMillis() - runStart);
                    testCountdown.countDown();
                }

                @Override
                public void failed(final Exception ex) {
                    Assertions.fail(ex.getMessage());
                }

                @Override
                public void cancelled() {
                    Assertions.fail("Http Request Cancelled");
                }
            });
        });

        // await execution then print the stats one last time
        testCountdown.await();
        statsAggregator.printStats();
    }

    @Test
    public void testReactive() {

        final WebClient unitUnderTest = WebClient.builder().build();

        // Runs all of the tests,sends results to a stats aggregator
        final StatsAggregator statsAggregator = new StatsAggregator(totalTests,messageInterval);
        Flux.range(0,totalTests)
                .flatMap(testNum -> {
                    final long runStart = System.currentTimeMillis();
                    return unitUnderTest.get()
                            .uri(testUri)
                            .retrieve()
                            .bodyToMono(Object.class)
                            .doOnSuccess(obj -> statsAggregator.addTestDuration(System.currentTimeMillis() - runStart));
                })
                .then()
                .block();

        // print the stats one last time
        statsAggregator.printStats();
    }
}

StatsAggregator.java:

package com.blakeparmeter.bottleneck_mystery;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

/**
 * @author Blake L. Parmeter
 */
public class StatsAggregator {

    private static final DecimalFormat testCompleteFormat = new DecimalFormat("###,###,###");
    private static final DecimalFormat avgRequestFormat = new DecimalFormat("###,###.##");

    private final long start = System.currentTimeMillis();
    private final List<Long> times;
    private final TimerTask renderStatisticsTask;

    // Creates a timer task to calculate and render runtime stats in realtime.
    public StatsAggregator(final int totalTests,final long messageIntervalMillis) {

        this.times = new ArrayList<>(totalTests);

        renderStatisticsTask = new TimerTask() {

            private Long lastLogTime = null;
            private Integer lastLogSize = null;

            @Override
            public void run() {

                // Init variables needed for calculations
                final long logTime = System.currentTimeMillis();
                final List<Long> statsCopy;
                synchronized (times) {
                    if (!times.isEmpty()) {
                        statsCopy = new ArrayList<>(times);
                    } else {
                        System.out.println("No statistics have been loaded. Statistics will not be calculated.");
                        return;
                    }
                }
                Collections.sort(statsCopy);

                // print execution completion status
                System.out.println();
                final double percentComplete = ((double) statsCopy.size() / (double) totalTests);
                final long runtime = logTime - start; //ms
                final double estimatedTimeRemaining = ((double) runtime / percentComplete) - (double) runtime; //ms
                System.out.println(testCompleteFormat.format(statsCopy.size())
                        + "\tTests completed of:"
                        + testCompleteFormat.format(totalTests)
                        + "\t"
                        + avgRequestFormat.format(percentComplete * 100)
                        + "% complete. "
                        + "Running for: "
                        + runtime / 1000d
                        + " seconds. "
                        + "Estimated Time remaining: "
                        + testCompleteFormat.format(estimatedTimeRemaining / 1000d)
                        + " seconds.");

                // print running average requests / second
                String sinceLastLogStats = "";
                if (lastLogSize != null && lastLogTime != null) {
                    double numSinceLastLog = (double) statsCopy.size() - lastLogSize;
                    double timeSinceLastLog = (double) logTime - lastLogTime;
                    double avgReqPerSecSinceLastLogSec = 1000 * (numSinceLastLog / timeSinceLastLog);
                    sinceLastLogStats = "\tavg req/sec:"
                            + avgRequestFormat.format(avgReqPerSecSinceLastLogSec)
                            + "(since last run)";
                }
                lastLogSize = statsCopy.size();
                lastLogTime = logTime;
                double avgReqPerSec = 1000 * ((double) statsCopy.size() / (double) (logTime - start));
                System.out.println("\tavg req/sec:"
                        + avgRequestFormat.format(avgReqPerSec)
                        + "(total)"
                        + sinceLastLogStats);

                // print average min and max
                double avg = (double) statsCopy.stream().reduce(Long::sum).orElseThrow() / (double) statsCopy.size();
                System.out.println("\tavg:" + avgRequestFormat.format(avg) +
                        "\tmin:" + statsCopy.get(0) +
                        "\tmax:" + statsCopy.get(statsCopy.size() - 1));

                // print percentiles
                System.out.println("\tRequest duration percentiles:\n" +
                        "\t\t1%:" + percentile(statsCopy,1) +
                        "\t5%:" + percentile(statsCopy,5) +
                        "\t10%:" + percentile(statsCopy,10) +
                        "\t50%:" + percentile(statsCopy,50) +
                        "\t90%:" + percentile(statsCopy,90) +
                        "\t95%:" + percentile(statsCopy,95) +
                        "\t99%:" + percentile(statsCopy,99) +
                        "\t99.9%:" + percentile(statsCopy,99.9) +
                        "\t99.99%:" + percentile(statsCopy,99.99));

                System.out.println("\tCalculations took:" + (System.currentTimeMillis() - logTime) + "ms.");
            }
        };

        // Schedule printing of statistics on a timer
        final Timer timer = new Timer("test-output-timer",true);
            timer.schedule(renderStatisticsTask,messageIntervalMillis,messageIntervalMillis);
    }

    public void printStats() {
        renderStatisticsTask.run();
    }

    public void addTestDuration(final long time) {
        times.add(time);
    }

    private static long percentile(List<Long> times,double percentile) {
        int index = (int) Math.ceil(percentile / 100.0 * times.size());
        return times.get(index - 1);
    }
}

为此测试运行线模拟:

  1. 从此处下载独立 jar:http://wiremock.org/docs/running-standalone/ 到一个目录,在这些说明中将称为 <wiremock_directory>。下载的文件将被称为 <wiremock.jar>
  2. 创建目录<wiremock_directory>/mappings
  3. 创建一个名为 endpoint.json 的文件并将其放入 <wiremock_directory>/mappings
{
    "request": {
        "method": "GET","urlPathPattern": "/wait/fixed/empty"
    },"response": {
        "headers": {
            "Content-Type": "application/json;charset=UTF-8"
        },"status": 200,"fixedDelayMilliseconds": 80
    }
}
  1. 使用以下命令以可以处理高并发的模式运行wiremock:java -jar <wiremock.jar> --port 5000 --container-threads 250 --jetty-acceptor-threads 200 --no-request-journal 注意:用下载的文件替换命令中的文件名。

解决方法

如果您无法最大化 CPU 的内核数,则意味着您的线程正在等待。最有可能是 IO。当您拨打网络电话时。一般来说,一旦 IO 发挥作用,您就不太可能创建 CPU 绑定负载。要了解您的线程在做什么,请在您的测试运行时创建一个线程转储并查看它们的堆栈跟踪。

还有几件事:

  • 如果您不创建旨在窃取工作的任务,最好不要使用 ForkJoinPool。 IE。 ForkJoinTask 秒。在您的情况下,一个简单的ThreadPoolExecutor可能会表现得更好。
  • 您可能会用平行流射击自己的膝盖。您创建了一个具有许多线程的执行程序,每个线程都会收到一个 Runnable 来执行,该线程使用并行引擎,而并行引擎又使用一个共享的 ForkJoinPool,该线程的线程数与逻辑 CPU 内核的数量一样多。如果您有 8 个逻辑核心,则意味着您有 100 个线程驱动任务,这些任务都共享一个 8 线程池来完成它们的工作。 (不过,如果是这样的话,如果我的数学计算正确的话,您每秒应该只能收到大约 100 个请求。)
  • 收集每个请求的时间也可能代价高昂。我不想每次请求都调用 System.currentTimeMillis() 两次。 (不过,与 http 客户端执行的网络请求的成本相比,该成本很可能可以忽略不计。)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res