我观察的一件事(不是双关语)当然是懒惰的执行.在下面的探索性代码中,我注意到没有任何内容被执行,直到merge.subscribe(pet – > System.out.println(pet))被调用.但令我着迷的是当我订阅第二个订阅者merge.subscribe(pet – > System.out.println(“Feed”pet))时,它再次启动了“迭代”.
我想要理解的是迭代的行为.它似乎没有像只能使用一次的Java 8流.它一字一行地经历每个String一个,并将其作为当时的值发布?在任何以前发布的订阅者接收到这些项目的新用户,如果它们是新的?
public class RxTest { public static void main(String[] args) { Observable<String> dogs = Observable.from(ImmutableList.of("Dasher","Rex")) .filter(dog -> dog.matches("D.*")); Observable<String> cats = Observable.from(ImmutableList.of("Tabby","Grumpy Cat","Meowmers","Peanut")); Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey")); Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets); merge.subscribe(pet -> System.out.println(pet)); merge.subscribe(pet -> System.out.println("Feed " + pet)); } }
解决方法
为了使任何异步/同步的可观察<使用.toBlocking()将其类型更改为BlockingObservable<T>
.而不是.subscribe()它包含用.forEach()执行每个结果的操作的新方法,或强制使用.first()
可观察是一个很好的工具,因为它们主要是确定性的(相同的输入总是产生相同的输出,除非你做错了),可重用(可以作为命令/策略模式的一部分发送),大部分都忽略同意,因为他们不应该依靠共同的国家(也就是做错事). BlockingObservables是好的,如果你试图把一个基于可观察的库到命令式语言,或者只是执行一个可观察的操作,你有100%的信心,它的管理良好.
围绕这些原则构建您的应用程序是一个范例的变化,我无法真正地回答这个答案.
*There are breaches like
Subject
andObservable.create()
that are needed to integrate with imperative frameworks.
原文地址:https://www.jb51.cc/java/125624.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。