微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

RxJava可观察到的“迭代”如何工作?

我开始用Rx Java和ReactFX玩,我变得非常着迷.但是正在试验我有几十个问题,我不断研究答案.

我观察的一件事(不是双关语)当然是懒惰的执行.在下面的探索性代码中,我注意到没有任何内容被执行,直到merge.subscribe(pet – > System.out.println(pet))被调用.但令我着迷的是当我订阅第二个订阅者merge.subscribe(pet – > System.out.println(“Feed”pet))时,它再次启动了“迭代”.

我想要理解的是迭代的行为.它似乎没有像只能使用一次的Java 8流.它一字一行地经历每个String一个,并将其作为当时的值发布?在任何以前发布的订阅者接收到这些项目的新用户,如果它们是新的?

public class RxTest {

    public static void main(String[] args) {

        Observable<String> dogs = Observable.from(ImmutableList.of("Dasher","Rex"))
                .filter(dog -> dog.matches("D.*"));

        Observable<String> cats = Observable.from(ImmutableList.of("Tabby","Grumpy Cat","Meowmers","Peanut"));

        Observable<String> ferrets = Observable.from(CompletableFuture.supplyAsync(() -> "Harvey"));

        Observable<String> merge = dogs.mergeWith(cats).mergeWith(ferrets);

        merge.subscribe(pet -> System.out.println(pet));


        merge.subscribe(pet -> System.out.println("Feed " + pet));

    }
}

解决方法

可观察到的< T>代表一个monad,一个链接的操作,而不是执行操作本身.它是描述性语言,而不是你习惯的命令.要执行一个操作,你.subscribe()到它.每次订阅新的执行流都是从头开始创建的.除非您使用.subscribeOn()或.observeOn()指定线程更改,否则不要将流与线程混淆,因为预订将同步执行.您将新元素链接到任何现有操作/ monad / Observable以添加 new behaviour,例如更改线程,过滤,累积,转换等.如果您的可观察是昂贵的操作,则不想在每个订阅上重复,您可以防止使用.cache()进行娱乐.

为了使任何异步/同步的可观察<使用.toBlocking()将其类型更改为BlockingObservable<T>.而不是.subscribe()它包含用.forEach()执行每个结果的操作的新方法,或强制使用.first()

可观察是一个很好的工具,因为它们主要是确定性的(相同的输入总是产生相同的输出,除非你做错了),可重用(可以作为命令/策略模式的一部分发送),大部分都忽略同意,因为他们不应该依靠共同的国家(也就是做错事). BlockingObservables是好的,如果你试图把一个基于可观察的库到命令式语言,或者只是执行一个可观察的操作,你有100%的信心,它的管理良好.

围绕这些原则构建您的应用程序是一个范例的变化,我无法真正地回答这个答案.

*There are breaches like Subject and Observable.create() that are needed to integrate with imperative frameworks.

原文地址:https://www.jb51.cc/java/125624.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐