如何解决检测Source.queue
我希望有一个Source.queue
(或类似的东西可以将其推送到物化图上),用来告诉我队列的当前饱和度。
我想这样做,而不(重新)实现QueueSource
图形阶段提供的功能。
我想出的一种可能的解决方法是:
object InstrumentedSource {
final class InstrumentedSourceQueueWithComplete[T](
delegate: SourceQueueWithComplete[T],bufferSize: Int,)(implicit executionContext: ExecutionContext)
extends SourceQueueWithComplete[T] {
override def complete(): Unit = delegate.complete()
override def fail(ex: Throwable): Unit = delegate.fail(ex)
override def watchCompletion(): Future[Done] = delegate.watchCompletion()
private val buffered = new AtomicLong(0)
private[InstrumentedSource] def onDequeue(): Unit = {
val _ = buffered.decrementAndGet()
}
object BufferSaturationRatioGauge extends RatioGauge {
override def getRatio: RatioGauge.Ratio = RatioGauge.Ratio.of(buffered.get(),bufferSize)
}
lazy val bufferSaturationGauge: RatioGauge = BufferSaturationRatioGauge
override def offer(elem: T): Future[QueueOfferResult] = {
val result = delegate.offer(elem)
result.foreach {
case QueueOfferResult.Enqueued =>
val _ = buffered.incrementAndGet()
case _ => // do nothing
}
result
}
}
def queue[T](bufferSize: Int,overflowStrategy: OverflowStrategy)(
implicit executionContext: ExecutionContext,materializer: Materializer,): Source[T,InstrumentedSourceQueueWithComplete[T]] = {
val (queue,source) = Source.queue[T](bufferSize,overflowStrategy).preMaterialize()
val instrumentedQueue = new InstrumentedSourceQueueWithComplete[T](queue,bufferSize)
source.mapMaterializedValue(_ => instrumentedQueue).map { item =>
instrumentedQueue.onDequeue()
item
}
}
}
这似乎主要是通过一些手动测试来实现的(除了buffered
最终最终与队列中的实际项目数保持一致,在我的情况下应该是可以的),但是我想知道是否有解决方案可以更好地利用我可能错过的内置功能。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。