微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用RxJava,Room和翻新版实现单一真相SSOT

如何解决使用RxJava,Room和翻新版实现单一真相SSOT

我正在使用Room作为数据库从网络API加载数据并将其存储在数据库中。我想将数据库用作单一事实来源(SSOT)。我已经创建了一个使用Room和LiveData实现此体系结构的功能

这是我的应用场景:

  1. 数据库中加载存储的数据(如果存在)并更新UI。
  2. 从API加载更新的数据。
  3. 将数据保存到数据库中。
  4. 作为本地数据更新自动更新UI。

在此函数中,我从本地数据创建LiveData实例,并使用emitSource()发出数据。那么我只需要从网络加载数据并将其保存到数据库中,我的LiveData就会自动被UI消耗的更新数据触发。

这是我的功能...

fun <T,A> resultLiveData(
    databaseQuery: () -> LiveData<T>,networkCall: suspend () -> Response<A>,saveCallResult: suspend (A) -> Unit
): LiveData<Result<T>> = liveData(dispatchers.IO) {
    // emit Loading status
    emit(Result.loading<T>())

    // emit data from database
    val localSource = databaseQuery.invoke().map { Result.success(it) }
    emitSource(localSource)

    // try loading data from network and handle possible network errors
    val response = getResult { networkCall.invoke() }
    if (response.status == Result.Status.SUCCESS) {
        /* save data loaded from network into database. as we are using LiveData
           observers will be automatically triggered on database change */
        saveCallResult(response.data!!)
    } else if (response.status == Result.Status.ERROR) {
        emit(Result.error<T>(response.message,response.error))
    }
}

这对LiveData很好用。现在,我想使用RxJava实现类似于此功能功能。我的意思是我需要创建一个Observable对象,以便我可以订阅该对象,并且当我处置该对象时,所有内容都将被取消。

我已经实现了一个功能,但是无法正常工作。我从本地源加载数据并调用emitter.onNext()的部分的行为与预期不符。

fun <T,A> resultObservable(
    databaseQuery: () -> Observable<T>,networkCall: () -> Response<A>,saveCallResult: (A) -> Unit
): Observable<Result<T>> = Observable.create(ObservableOnSubscribe<Result<T>> { emitter ->
    // emit Loading status
    emitter.onNext(Result.loading<T>())

    // emit data from database
    databaseQuery.invoke().doOnNext {
        emitter.onNext(Result.success(it))
    }.subscribe()


    // try loading data from network and handle possible network errors
    val response = getResult2 { networkCall.invoke() }
    if (response.status == Result.Status.SUCCESS) {
        /* save data loaded from network into database. as we are using LiveDataz
           observers will be automatically triggered on database change */
        saveCallResult(response.data!!)
    } else if (response.status == Result.Status.ERROR) {
//        emitter.onNext(Result.error<T>(response.message,response.error))
        emitter.onError(response.error)
    }

}).subscribeOn(Schedulers.io())

我不是RxJava的专业人士,所以请给我一个好的解决方案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。