如何解决订阅后连接到ConnectableFlowable
我正在使用RxJava2进行远程数据源的响应处理:我有 import json
# Decoding json
data = json.loads({"id": 1,"interviewer": "hengtw1","incidenttwg1": {"id": 5,"child_occupation": [6]}})
print(data["incidenttwg1"]["child_occupation"])
# this will print [6] (list)
print(data["incidenttwg1"]["child_occupation"][0])
# this will print 6 (list item)
的远程数据源,其中Publisher<T>
是数据项,我的使用者是T
,现在我想通过接受Publisher<T>
将其保存在本地来处理远程源。
这些功能的原型是:
Publisher<T>
我正在使用 public static void main(String[] args) throws Exception {
accept(fetch(remote()).flatMapPublisher(x -> x))
.blockingAwait();
System.out.println("done");
}
/**
* Prototype of remote data source.
* @return Publisher from network
*/
private static Single<? extends Publisher<Integer>> remote() {
return Single.just(Flowable.just(1,2,3,4));
}
/**
* Prototype of consumer: accepts reactive data stream and print to stdout.
* @param data Data source
*/
private static Completable accept(Publisher<Integer> data) {
return Flowable.frompublisher(data).doOnNext(x -> System.out.printf("received %s\n",x))
.ignoreElements();
}
/**
* Fetch remote resource from network and save locally.
* @param remote Publisher of data from network
*/
private static Single<? extends Publisher<Integer>> fetch(Single<? extends Publisher<Integer>> remote) {
return remote.flatMap(
pub -> {
final ConnectableFlowable<Integer> conPub = Flowable.frompublisher(pub).publish();
saveLocally(conPub).subscribe(
// we don't care if `saveLocally` fails,just continue fetching
() -> System.out.println("Saved locally successfully"),err -> System.out.printf("Failed to save locally: %s\n",err)
);
return Single.fromCallable(
() -> {
conPub.connect();
return conPub;
}
);
}
);
}
private static Completable saveLocally(Publisher<Integer> source) {
// emulating save operation
return Flowable.frompublisher(source)
.flatMapCompletable(x -> Completable.fromAction(() -> System.out.printf("save: %d\n",x)));
}
在本地保存远程ConnectableFlowable
并返回给消费者,但是当我使用Publisher
返回发行人{{1} }方法显示:
ConnectableFlowable.connect()
消费者没有收到任何数据吗?
我发现调用main()
有一个肮脏的解决方法,但要使此原型正常工作有些延迟:
save: 1
save: 2
save: 3
save: 4
Saved locally successfully
通过这种方式,消费者可以接收所有数据:
connect()
但是,延迟修复代码并不可靠。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。