微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

检查一个特定事件,然后检查另一个特定事件,并使用RxJava发出成功

如何解决检查一个特定事件,然后检查另一个特定事件,并使用RxJava发出成功

我需要检查一个无限的可观察事件(来自设备的事件)是否发出特定事件,让我们将其称为“已启动”,然后再调用一个“已完成”。但是,在这两个事件之间,可以接收任意数量的不同事件,因此必须将其忽略。这样的结果应该是Completable.complete(),当在设置的超时之前“开始”事件之后接着“完成”事件时,该成功。

对于这个问题,我有一个可行的解决方案,但是它看起来很丑陋而且太复杂了,我认为可能有一个更优雅/更简单的解决方案。我当前的代码看起来像这样,我已经对代码进行了概括,因此更容易理解,基本上在此示例中,我检查了Flowable在发出10秒超时之前发出数字“ 5”之后是否收到数字“ 8” 。:

    Flowable<Long> events = Flowable.interval(1,TimeUnit.SECONDS,testScheduler)
            .publish().autoConnect(1);

    return events
            .filter(number -> number == 5)
            .firstElement()
            .concatMapCompletable(number -> {
                if (number == 5) {
                    return events
                            .filter(number2 -> number2 == 8)
                            .firstElement()
                            .concatMapCompletable(number2 -> {
                                if (number2 == 8) {
                                    return Completable.complete();
                                } else {
                                    return Completable.error(new Exception("Number 3 expected,got " + number2));
                                }
                            });
                } else {
                    return Completable.error(new Exception("Number 2 expected,got " + number));
                }
            })
            .timeout(10,Completable.error(new Exception("Timeout!")));

编辑: 我找到了一个更干净的版本,但是由于我使用.filter运算符然后在收到的第一个元素上完成时显得很奇怪,我将其发布在下面以供参考:

    Flowable<Long> events = Flowable.interval(1,testScheduler)
            .publish().autoConnect(1);

    TestObserver testObserver = events
            .filter(number -> number == 5)
            .firstElement()
            .concatMapCompletable(number ->
                    events
                            .filter(number2 -> number2 == 8)
                            .firstElement()
                            .concatMapCompletable(number2 ->
                                    Completable.complete()))
            .timeout(10,Completable.error(new Exception("Timeout!")))
            .test();

UPDATE2: 我更满意的版本:

    Flowable<Long> events = Flowable.interval(1,testScheduler)
            .publish().autoConnect(1);

    TestObserver testObserver = events
            .skipwhile(number -> number != 5)
            .firstElement()
            .flatMapCompletable(number -> Completable.fromObservable(events
                    .takeuntil(number2 -> number2 == 8)
                    .toObservable()
            ));

解决方法

我不确定要确切地做什么,但是您可以使用bufferwindow运算符,如下所示:

Flowable.just(1,2,3,4,5)
        .buffer(2,1)
        .filter(e -> e.size() > 1)
        .flatMapCompletable(e -> {
            int first = e.get(0);
            int second = e.get(1);
            if (first == 2) {
                if (second == 3) {
                    return Completable.complete();
                } else {
                    return Completable.error(new Exception("..."));
                }
            }

            return Completable.fromObservable(Observable.just(e));
        })

更新

Observable<Long> source = Observable.interval(1,TimeUnit.SECONDS)
        .share();

source
        .skipWhile(e -> e != 5)
        .flatMapCompletable(e -> Completable.fromObservable(source
                .takeUntil(x -> x == 8)
                .timeout(10,TimeUnit.SECONDS)))
        .subscribe();

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。