微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么在 thenApplyAsync 上阻止工作但在 thenApply 上不起作用

如何解决为什么在 thenApplyAsync 上阻止工作但在 thenApply 上不起作用

我们在应用程序中看到了一些有趣的行为。以下 Spock 规范捕获了该行为。我试图理解为什么第二个测试通过但第一个抛出 TimeoutException。

总结: 有一个带有模拟端点的模拟服务器,它在 10 毫秒延迟后成功响应。 我们使用 AsyncHttpClient 对这个模拟端点进行非阻塞调用。第一个调用与对同一端点的第二个阻塞调用链接在一起。如果使用 thenApply,则第一个调用成功,但第二个调用失败并超时,但如果使用 thenApplyAsync,则调用成功。在这两种情况下,模拟服务器似乎都在 10 毫秒内响应。

依赖:


    implementation 'com.google.guava:guava:29.0-jre'
    implementation 'org.asynchttpclient:async-http-client:2.12.1'

    // Use the latest Groovy version for Spock testing
    testImplementation 'org.codehaus.groovy:groovy-all:2.5.11'

    // Use the awesome Spock testing and specification framework even with Java
    testImplementation 'org.spockframework:spock-core:1.3-groovy-2.5'
    testImplementation 'org.objenesis:objenesis:1.4'
    testImplementation "cglib:cglib:2.2"
    testImplementation 'junit:junit:4.13'
    testImplementation 'org.mock-server:mockserver-netty:5.11.1'

Spock 规格:


package com.switchcase.asyncthroughput

import com.google.common.base.Charsets
import org.asynchttpclient.DefaultAsyncHttpClient
import org.asynchttpclient.RequestBuilder
import org.mockserver.integration.ClientAndServer
import org.mockserver.model.HttpResponse
import spock.lang.Shared
import spock.lang.Specification

import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException

import static org.mockserver.integration.ClientAndServer.startClientAndServer
import static org.mockserver.model.HttpRequest.request

class CompletableFutureThreadsTest extends Specification {

    @Shared
    ClientAndServer mockServer

    def asyncHttpClient = new DefaultAsyncHttpClient();

    def setupSpec() {
        mockServer = startClientAndServer(9192);
        //create a mock server which response with "done" after 100ms.
        mockServer.when(request()
                .withMethod("POST")
                .withPath("/validate"))
                .respond(HttpResponse.response().withBody("done")
                        .withStatusCode(200)
                        .withDelay(TimeUnit.MILLISECONDS,10));
    }

    def "Calls external using AHC with a blocking call with 1sec timeout results in TimeoutException."() {
        when:
        callExternal().thenApply({ resp -> callExternalBlocking() }).join()

        then:
        def exception = thrown(CompletionException)
        exception instanceof CompletionException
        exception.getCause() instanceof TimeoutException
        exception.printstacktrace()
    }

    def "Calls external using AHC with a blocking call on ForkJoinPool with 1sec timeout results in success."() {
        when:
        def value = callExternal().thenApplyAsync({ resp -> callExternalBlocking() }).join()

        then:
        value == "done"
    }

    def cleanupSpec() {
        mockServer.stop(true)
    }

    private CompletableFuture<String> callExternal(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternal Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        })
    }

    private String callExternalBlocking(def timeout = 1000) {
        RequestBuilder requestBuilder = RequestBuilder.newInstance();
        requestBuilder.setMethod("POST").setUrl("http://localhost:9192/validate").setRequestTimeout(timeout)
        def cf = asyncHttpClient.executeRequest(requestBuilder).toCompletableFuture()
        return cf.thenApply({ response ->
            println("CallExternalBlocking Succeeded.")
            return response.getResponseBody(Charsets.UTF_8)
        }).join()
    }
}

编辑:

超时的调试日志和堆栈跟踪:(超时发生在 callExternalBlocking 中的远程调用上)

17:37:38.885 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.timeout.TimeoutTimerTask - Request timeout to localhost/127.0.0.1:9192 after 1000 ms for NettyResponseFuture{currentRetry=0,isDone=0,isCancelled=0,asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@478251c9,nettyRequest=org.asynchttpclient.netty.request.NettyRequest@4945b749,future=java.util.concurrent.CompletableFuture@4d7a3ab9[Not completed,1 dependents],uri=http://localhost:9192/validate,keepAlive=true,redirectCount=0,timeoutsHolder=org.asynchttpclient.netty.timeout.TimeoutsHolder@878bd72,inAuth=0,touch=1622248657866} after 1019 ms
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.channel.ChannelManager - Closing Channel [id: 0x5485056c,L:/127.0.0.1:58076 - R:localhost/127.0.0.1:9192] 
17:37:38.886 [AsyncHttpClient-timer-2-1] DEBUG org.asynchttpclient.netty.request.NettyRequestSender - Aborting Future NettyResponseFuture{currentRetry=0,touch=1622248657866}

java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniapply(CompletableFuture.java:607)
    at java.util.concurrent.CompletableFuture$uniapply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
    at org.asynchttpclient.netty.NettyResponseFuture.abort(NettyResponseFuture.java:273)
    at org.asynchttpclient.netty.request.NettyRequestSender.abort(NettyRequestSender.java:473)
    at org.asynchttpclient.netty.timeout.TimeoutTimerTask.expire(TimeoutTimerTask.java:43)
    at org.asynchttpclient.netty.timeout.RequestTimeoutTimerTask.run(RequestTimeoutTimerTask.java:50)
    at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
    at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
    at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Request timeout to localhost/127.0.0.1:9192 after 1000 ms
    ... 7 more

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。