微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Micronaut RxHttpClient 响应处理

如何解决Micronaut RxHttpClient 响应处理

我编写了以下代码上传文件

@Slf4j
@Singleton
public class UploadService {

    @Inject
    @Client("${upload.url}")
    private RxHttpClient httpClient;

    public Flowable<HttpResponse<UploadFileResponse>> uploadFile(Path localFile) {

        MultipartBody requestBody = MultipartBody.builder()
            .addPart("file",localFile.getFileName().toString(),localFile.toFile())
            .build();

        return httpClient.exchange(
            HttpRequest.POST("/upload",requestBody).contentType(MULTIPART_FORM_DATA_TYPE),UploadFileResponse.class
        );
    }
}

我正在尝试在另一张地图中运行它:

.....
return files
    .filter(file -> file.getLocalFile() != null)
    .parallel(transferParallelism)
    .runOn(Schedulers.io())
    .map(file ->
        file.withUploaded(
           uploadService.uploadFile(file.getLocalFile())
               .doOnError(throwable -> log.error("File " + file.getLocalFile().getFileName().toString() + " is not uploaded"))
               .map(it -> it.getBody(UploadFileResponse.class)
               .map(UploadFileResponse::isuploaded)
               .orElse(false))
               .blockingFirst()
        )
    )
    .sequential().toList()
    .map(....);
.......

如果我的存储不可用,我有以下堆栈跟踪:

io.reactivex.exceptions.UndeliverableException: The exception Could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has Nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
    at io.reactivex.internal.operators.parallel.ParallelJoin$JoinSubscription.onError(ParallelJoin.java:191)
    at io.reactivex.internal.operators.parallel.ParallelJoin$JoinInnerSubscriber.onError(ParallelJoin.java:527)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onError(RxInstrumentedSubscriber.java:66)
    at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onError(ParallelMap.java:131)
    at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onNext(ParallelMap.java:117)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
    at io.reactivex.internal.operators.parallel.ParallelRunOn$RunOnSubscriber.run(ParallelRunOn.java:273)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
    at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
    at io.reactivex.internal.subscribers.BlockingBaseSubscriber.blockingGet(BlockingBaseSubscriber.java:72)
    at io.reactivex.Flowable.blockingFirst(Flowable.java:5699)
    at com.wefi.commcache.FwCommCacheUpdater.lambda$updateCommCachePartitions$3(FwCommCacheUpdater.java:77)
    at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onNext(ParallelMap.java:113)
... 9 more
Caused by: java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
    at java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
    at io.reactivex.internal.subscribers.BlockingBaseSubscriber.blockingGet(BlockingBaseSubscriber.java:65)
... 12 more

看起来 .doOnError() 不起作用。 我是 RxJava 的新手。有人能解释一下如何处理吗? 我是否正确调用我的代码以获得布尔结果?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。