如何解决Micronaut RxHttpClient 响应处理
@Slf4j
@Singleton
public class UploadService {
@Inject
@Client("${upload.url}")
private RxHttpClient httpClient;
public Flowable<HttpResponse<UploadFileResponse>> uploadFile(Path localFile) {
MultipartBody requestBody = MultipartBody.builder()
.addPart("file",localFile.getFileName().toString(),localFile.toFile())
.build();
return httpClient.exchange(
HttpRequest.POST("/upload",requestBody).contentType(MULTIPART_FORM_DATA_TYPE),UploadFileResponse.class
);
}
}
我正在尝试在另一张地图中运行它:
.....
return files
.filter(file -> file.getLocalFile() != null)
.parallel(transferParallelism)
.runOn(Schedulers.io())
.map(file ->
file.withUploaded(
uploadService.uploadFile(file.getLocalFile())
.doOnError(throwable -> log.error("File " + file.getLocalFile().getFileName().toString() + " is not uploaded"))
.map(it -> it.getBody(UploadFileResponse.class)
.map(UploadFileResponse::isuploaded)
.orElse(false))
.blockingFirst()
)
)
.sequential().toList()
.map(....);
.......
如果我的存储不可用,我有以下堆栈跟踪:
io.reactivex.exceptions.UndeliverableException: The exception Could not be delivered to the consumer because it has already canceled/disposed the flow or the exception has Nowhere to go to begin with. Further reading: https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling | java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:367)
at io.reactivex.internal.operators.parallel.ParallelJoin$JoinSubscription.onError(ParallelJoin.java:191)
at io.reactivex.internal.operators.parallel.ParallelJoin$JoinInnerSubscriber.onError(ParallelJoin.java:527)
at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onError(RxInstrumentedSubscriber.java:66)
at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onError(ParallelMap.java:131)
at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onNext(ParallelMap.java:117)
at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onNext(RxInstrumentedSubscriber.java:59)
at io.reactivex.internal.operators.parallel.ParallelRunOn$RunOnSubscriber.run(ParallelRunOn.java:273)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)
at io.reactivex.internal.subscribers.BlockingBaseSubscriber.blockingGet(BlockingBaseSubscriber.java:72)
at io.reactivex.Flowable.blockingFirst(Flowable.java:5699)
at com.wefi.commcache.FwCommCacheUpdater.lambda$updateCommCachePartitions$3(FwCommCacheUpdater.java:77)
at io.reactivex.internal.operators.parallel.ParallelMap$ParallelMapSubscriber.onNext(ParallelMap.java:113)
... 9 more
Caused by: java.lang.InterruptedException
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
at java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:232)
at io.reactivex.internal.subscribers.BlockingBaseSubscriber.blockingGet(BlockingBaseSubscriber.java:65)
... 12 more
看起来 .doOnError()
不起作用。
我是 RxJava 的新手。有人能解释一下如何处理吗?
我是否正确调用我的代码以获得布尔结果?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。