如何解决RxJava3 - 如何在使用共享运算符时将 doFinally 与线程安全操作一起使用时避免死锁?
我正在尝试根据 id 将同一个 Flowable 共享给多个订阅者,而每个订阅者都可以在需要时取消订阅以取消订阅。如果某个 id 的所有订阅者都取消订阅,然后另一个订阅者尝试订阅该 id,则应该创建一个新订阅。下面的代码是用 Kotlin 编写的,试图实现这个功能。
class SharedFlowProvider() {
private val flowProvider = FlowProvider()
private val eventFlowById = HashMap<String,Flowable<ComputedProperties>>()
@Synchronized
override fun subscribetoProperties(subscriber: CancellableSubscriber<ComputedProperties>,id: String){
eventFlowById.computeIfAbsent(id) { buildFlowable(id) }
.subscribe(orderBasedSubscriber)
}
private fun buildFlowable(id: String) =
flowProvider.getFlowable(id)
.doFinally { removeSubscription(id) }
.share()
@Synchronized
private fun removeSubscription(id: String) {
eventFlowById.remove(id)
}
}
当在同一 ID 上取消后跟订阅时会出现问题。 由于共享返回的 FlowableRefCount 的工作方式,我发现这种方法可能存在死锁。在底层实现中,FlowableRefCount 在其 subscribeActual 和 cancel 方法中使用同步机制,虽然乍一看并不明显,但 doFinally 方法的操作在该锁定机制内运行。这有点违反直觉,因为在 doFinally 文档中它说:
请注意 onFinally 操作在订阅之间共享,因此应该是线程安全的。
因此,出现了以下场景:
- 通过取消锁定 FlowableRefCount 实例
- subscribetoProperties 使用与取消中相同的 ID 调用
- subscribetoProperties 在调用 subscribe(然后 subscribeActual)时被阻塞,因为已经通过取消在 FlowableRefCount 上获取了锁
- 调用 removeOrderSubscription 被阻止,因为 SharedFlowProvider 上的锁定已经通过 subscribetoProperties 方法获取
我还尝试通过以下方式使用 ConcurrentHashMap 从 subscribetoProperties 方法中分离锁定:
private val eventFlowById = ConcurrentHashMap<String,Flowable<ComputedProperties>>()
override fun subscribetoProperties(subscriber: CancellableSubscriber<ComputedProperties>,id: String){
val flow = eventFlowById.computeIfAbsent(id) { buildFlowable(id) }
flow.subscribe(orderBasedSubscriber)
//the flow associated with an id Could be removed by a cancel done before the subscribe call,so we need to make sure it is added to the map
eventFlowById.computeIfAbsent(id) { flow }
}
但是在这种方法中,如果我们在方法的最后一行之前进行了订阅,然后快速取消,我们最终可能会将流放入地图中,即使我们没有订阅者.
关于如何在没有死锁或竞争条件的情况下实现此功能的一些想法,我将不胜感激。
谢谢
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。