如何解决使用Java进行大约20个HTTP调用并将数据传递到数据库
我有20个项目的集合,我将为这些项目创建一个循环并进行API调用以获取数据,基于返回的数据,我将不得不在数据库中进行更新。这个要求很简单,我可以用普通的Java完成。
现在,为了提高性能,我正在学习使用RxJava
。我浏览了互联网上的许多文章,发现人们使用async-http-client
库进行异步http调用,我发现该库已过时,维护者正在计划移交给其他人,例如RxJava库中给出的一个也像2014年开发的那样。由于我是RxJava的新手,请您帮我找到正确的方法。
我目前正在获取所有数据,并转换为如下所示的可观察对象
Observable<ENV> envs= Observable.fromIterable(allEnvs);
我还需要获得一些帮助,例如上面的代码很好,还是应该为可观察的结构创建以下代码,这是groovy的代码段,我必须用Java编写。
val createObserver = Observable.create(ObservableOnSubscribe<String> { emitter ->
emitter.onNext("Hello World")
emitter.onComplete()
})
请帮助我选择最佳方法
解决方法
想象一下,http调用由下面的类表示:
public class HttpCall implements Callable<String> {
private final int i;
private HttpCall(int i) {
this.i = i;
}
@Override
public String call() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Something for : " + i;
}
}
等待2秒钟,然后发出一个字符串(http调用结果)。
要合并来自不同http调用的所有项,我们可以使用merge
运算符。但是在此之前,我们需要使用Callable
运算符将Observable
转换为fromCallable
。
void sequentially() {
List<Observable<String>> httpRequests = IntStream.range(0,20)
.mapToObj(HttpCall::new)
.map(Observable::fromCallable)
.collect(Collectors.toList());
Observable.merge(httpRequests)
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
}
因为所有请求都在同一线程上执行,所以顺序保持不变:
已用时间:1602122218-表示:0。在线程上执行:主
经过的时间:1602122220-用于:1.在线程上执行:main
经过的时间:1602122222-用于:2.在线程上执行:main
...
如您所见,这些项目相隔2秒。
要在自己的线程中运行每个请求,我们需要告诉Rx,每个调用都需要一个线程。轻而易举,只需切换到建议的调度程序之一即可。 IO是我们需要的(因为它是IO操作)。
void parallel( {
List<Observable<String>> httpRequests = IntStream.range(0,20)
.mapToObj(HttpCall::new)
.map(httpCall -> Observable.fromCallable(httpCall)
.subscribeOn(Schedulers.io())
) // take a thread from the IO pool
.collect(Collectors.toList());
Observable.merge(httpRequests)
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
}
这次不保证订单,并且几乎同时生产:
已用时间:1602123707-用于:2.在线程上执行:RxCachedThreadScheduler-3
经过的时间:1602123707-用于:0。在线程上执行:RxCachedThreadScheduler-1
经过的时间:1602123707-用于以下目的:1.在线程上执行:RxCachedThreadScheduler-1
...
代码可以像这样缩短:
Observable.range(0,20)
.map(HttpCall::new)
.flatMap(httpCall -> Observable.fromCallable(httpCall).subscribeOn(Schedulers.io()))
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
merge
在幕后使用flatMap
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。