如何解决从CompletionStage返回Single的正确方法
我正在使用RxJava2,Micronaut和Cassandra处理反应流。我是rxjava的新手,不确定以最佳异步方式返回List Person的正确方法是什么?
数据来自Cassandra Dao界面
public interface PersonDAO {
@Query("SELECT * FROM cass_drop.person;")
CompletionStage<MappedAsyncPagingIterable<Person>> getAll();
}
被注入到micronaut控制器中
return Single.just(personDAO.getAll().toCompletableFuture().get().currentPage()) .subscribeOn(Schedulers.io()) .map(people -> HttpResponse.ok(people));
OR
return Single.just(HttpResponse.ok()) .subscribeOn(Schedulers.io()) .map(it -> it.body(personDAO.getAll().toCompletableFuture().get().currentPage()));
或切换到RxJava3
return Single.fromCompletionStage(personDAO.getAll()) .map(page -> HttpResponse.ok(page.currentPage())) .onErrorReturn(throwable -> HttpResponse.ok(Collections.emptyList()));
解决方法
不是RxJava和Cassandra的专业人士:
在第一个和第二个示例中,您正在阻塞用CompletionStage
执行get
的线程,即使您在IO线程中执行此操作,我也不建议这样做。
您还使用了Single
,它只能发出一个值或一个错误。由于您想返回List
,因此我建议至少要使用Observable
。
第三点,来自Cassandra的结果是分页的,我不知道这是否是故意的,但是您只列出了第一页,而错过了其他页面。
我会尝试下面的解决方案,我一直使用IO线程(该操作在IO中可能会花费很多),然后遍历Cassandra获取的页面:
/* the main method of your controller */
@Get()
public Observable<Person> listPersons() {
return next(personDAO.getAll()).subscribeOn(Schedulers.io());
}
private Observable<Person> next(CompletionStage<MappedAsyncPagingIterable<Person>> pageStage) {
return Single.fromFuture(pageStage.toCompletableFuture())
.flatMapObservable(personsPage -> {
var o = Observable.fromIterable(personsPage.currentPage());
if (!personsPage.hasMorePages()) {
return o;
}
return o.concatWith(next(personsPage.fetchNextPage()));
});
}
,
如果您打算使用 reactor 而不是 RxJava,那么您可以尝试一下 cassandra-java-driver-reactive-mapper。
语法相当简单,仅在编译时有效。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。