微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Java进行大约20个HTTP调用并将数据传递到数据库

如何解决使用Java进行大约20个HTTP调用并将数据传递到数据库

我有20个项目的集合,我将为这些项目创建一个循环并进行API调用获取数据,基于返回的数据,我将不得不在数据库中进行更新。这个要求很简单,我可以用普通的Java完成。

现在,为了提高性能,我正在学习使用RxJava。我浏览了互联网上的许多文章,发现人们使用async-http-client库进行异步http调用,我发现该库已过时,维护者正在计划移交给其他人,例如RxJava库中给出的一个也像2014年开发的那样。由于我是RxJava的新手,请您帮我找到正确的方法

我目前正在获取所有数据,并转换为如下所示的可观察对象

Observable<ENV> envs= Observable.fromIterable(allEnvs);

我还需要获得一些帮助,例如上面的代码很好,还是应该为可观察的结构创建以下代码,这是groovy的代码段,我必须用Java编写。

val createObserver = Observable.create(ObservableOnSubscribe<String> { emitter ->
    emitter.onNext("Hello World")
    emitter.onComplete()
})

请帮助我选择最佳方法

解决方法

想象一下,http调用由下面的类表示:

public class HttpCall implements Callable<String> {

    private final int i;

    private HttpCall(int i) {
        this.i = i;
    }

    @Override
    public String call() {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return "Something for : " + i;
    }
}

等待2秒钟,然后发出一个字符串(http调用结果)。

要合并来自不同http调用的所有项,我们可以使用merge运算符。但是在此之前,我们需要使用Callable运算符将Observable转换为fromCallable

void sequentially() {
    List<Observable<String>> httpRequests = IntStream.range(0,20)
            .mapToObj(HttpCall::new)
            .map(Observable::fromCallable)
            .collect(Collectors.toList());

    Observable.merge(httpRequests)
            .timestamp(TimeUnit.SECONDS)
            .subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
}

因为所有请求都在同一线程上执行,所以顺序保持不变:

已用时间:1602122218-表示:0。在线程上执行:主
经过的时间:1602122220-用于:1.在线程上执行:main
经过的时间:1602122222-用于:2.在线程上执行:main
...

如您所见,这些项目相隔2秒。

要在自己的线程中运行每个请求,我们需要告诉Rx,每个调用都需要一个线程。轻而易举,只需切换到建议的调度程序之一即可。 IO是我们需要的(因为它是IO操作)。

void parallel( {
    List<Observable<String>> httpRequests = IntStream.range(0,20)
            .mapToObj(HttpCall::new)
            .map(httpCall -> Observable.fromCallable(httpCall)
                                .subscribeOn(Schedulers.io())
            ) // take a thread from the IO pool
            .collect(Collectors.toList());

    Observable.merge(httpRequests)
            .timestamp(TimeUnit.SECONDS)
            .subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
}

这次不保证订单,并且几乎同时生产:

已用时间:1602123707-用于:2.在线程上执行:RxCachedThreadScheduler-3
经过的时间:1602123707-用于:0。在线程上执行:RxCachedThreadScheduler-1
经过的时间:1602123707-用于以下目的:1.在线程上执行:RxCachedThreadScheduler-1
...


代码可以像这样缩短:

Observable.range(0,20)
                .map(HttpCall::new)
                .flatMap(httpCall -> Observable.fromCallable(httpCall).subscribeOn(Schedulers.io()))
                .timestamp(TimeUnit.SECONDS)
                .subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));

merge在幕后使用flatMap

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。