微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

订阅后连接到ConnectableFlowable

如何解决订阅后连接到ConnectableFlowable

我正在使用RxJava2进行远程数据源的响应处理:我有 import json # Decoding json data = json.loads({"id": 1,"interviewer": "hengtw1","incidenttwg1": {"id": 5,"child_occupation": [6]}}) print(data["incidenttwg1"]["child_occupation"]) # this will print [6] (list) print(data["incidenttwg1"]["child_occupation"][0]) # this will print 6 (list item) 的远程数据源,其中Publisher<T>是数据项,我的使用者是T,现在我想通过接受Publisher<T>将其保存在本地来处理远程源。

这些功能的原型是:

Publisher<T>

我正在使用 public static void main(String[] args) throws Exception { accept(fetch(remote()).flatMapPublisher(x -> x)) .blockingAwait(); System.out.println("done"); } /** * Prototype of remote data source. * @return Publisher from network */ private static Single<? extends Publisher<Integer>> remote() { return Single.just(Flowable.just(1,2,3,4)); } /** * Prototype of consumer: accepts reactive data stream and print to stdout. * @param data Data source */ private static Completable accept(Publisher<Integer> data) { return Flowable.frompublisher(data).doOnNext(x -> System.out.printf("received %s\n",x)) .ignoreElements(); } /** * Fetch remote resource from network and save locally. * @param remote Publisher of data from network */ private static Single<? extends Publisher<Integer>> fetch(Single<? extends Publisher<Integer>> remote) { return remote.flatMap( pub -> { final ConnectableFlowable<Integer> conPub = Flowable.frompublisher(pub).publish(); saveLocally(conPub).subscribe( // we don't care if `saveLocally` fails,just continue fetching () -> System.out.println("Saved locally successfully"),err -> System.out.printf("Failed to save locally: %s\n",err) ); return Single.fromCallable( () -> { conPub.connect(); return conPub; } ); } ); } private static Completable saveLocally(Publisher<Integer> source) { // emulating save operation return Flowable.frompublisher(source) .flatMapCompletable(x -> Completable.fromAction(() -> System.out.printf("save: %d\n",x))); } 在本地保存远程ConnectableFlowable并返回给消费者,但是当我使用Publisher返回发行人{{1} }方法显示

ConnectableFlowable.connect()

消费者没有收到任何数据吗?

我发现调用main()一个肮脏的解决方法,但要使此原型正常工作有些延迟:

save: 1
save: 2
save: 3
save: 4
Saved locally successfully

通过这种方式,消费者可以接收所有数据:

connect()

但是,延迟修复代码并不可靠。

原始代码有什么问题,我该如何正确解决

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。