微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 kotlin 协程创建轮询机制?

如何解决如何使用 kotlin 协程创建轮询机制?

我正在尝试使用 sharedFlow 使用 kotlin 协程创建一种轮询机制,并希望在没有订阅者时停止并在至少有一个订阅者时处于活动状态。我的问题是,在这种情况下,sharedFlow 是正确的选择还是我应该使用 channel。我尝试使用 channelFlow 但我不知道如何关闭块体外部的通道(而不是 cancel 作业)。有人可以帮忙吗?这是片段。

 fun poll(id: String) = channelFlow {
            while (!isClosedForSend) {
                try {
                    send(repository.getDetails(id))
                    delay(MIN_REFRESH_TIME_MS)
                } catch (throwable: Throwable) {
                    Timber.e("error -> ${throwable.message}")
                }
                invokeOnClose { Timber.e("channel flow closed.") }
        }
    } 

解决方法

首先,当你调用channelFlow(block)时,不需要手动关闭通道。块执行完成后通道会自动关闭。

我认为“生产”协程构建器功能可能是您所需要的。但不幸的是,它仍然是一个实验性的 api。

fun poll(id: String) = someScope.produce {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val channel = poll("hello")

    channel.receive()

    channel.cancel()
}

如果不调用返回的channel的receive()方法,produce函数就会挂起,所以不需要延迟。

更新:使用 broadcast 在多个 ReceiveChannel 之间共享值。

fun poll(id: String) = someScope.broadcast {
    invokeOnClose { Timber.e("channel flow closed.") }

    while (true) {
        try {
            send(repository.getDetails(id))
//          delay(MIN_REFRESH_TIME_MS)   //no need
        } catch (throwable: Throwable) {
            Timber.e("error -> ${throwable.message}")
        }
    }
}

fun main() = runBlocking {
    val broadcast = poll("hello")

    val channel1 = broadcast.openSubscription()
    val channel2 = broadcast.openSubscription()
    
    channel1.receive()
    channel2.receive()

    broadcast.cancel()
}
,

您可以使用 SharedFlow,它以广播方式发出值(在所有收集器都使用前一个值之前不会发出新值)。

val sharedFlow = MutableSharedFlow<String>()
val scope = CoroutineScope(Job() + Dispatchers.IO)
var producer: Job()

scope.launch {
    val producer = launch() {
            sharedFlow.emit(...)
    }

    sharedFlow.subscriptionCount
              .map {count -> count > 0}
              .distinctUntilChanged()
              .collect { isActive -> if (isActive) stopProducing() else startProducing()
}

fun CoroutineScope.startProducing() {
    producer = launch() {
        sharedFlow.emit(...)
    }
        
}

fun stopProducing() {
    producer.cancel()
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。