如何解决部分 akka 流的显式吞吐量限制
我的系统中有一个流程,它从 SQS(使用 alpakka)读取一些元素并进行一些预处理(约 10 个阶段,通常总共不到 1 分钟)。然后,准备好的元素被发送到主处理(单阶段,需要几分钟)。整个过程在 AWS/K8S 上运行,我们希望在 SQS 队列增长到某个阈值以上时进行横向扩展。问题是,SQS 队列需要很长时间才能炸毁,因为有很多元素在进程中“空闲”,已经完成了它们的预处理但在等待主要的事情。
我们无法将预处理内容外部化到单独的队列中,因为它们的结果无法在反序列化往返过程中幸存下来。此外,此服务与“主”处理器深度耦合(此服务作为主的边车运行)并且无法独立扩展。
预处理阶段在技术上是 .mapAsyncUnordered
,但整个过程已经非常简单(流阶段和 SQS 批次/缓冲区)。
我们尝试降低级间缓冲区(akka.stream.materializer.max-input-buffer-size),但这只会带来一些间接的好处,没有直接控制(而且太内部了,不能乱搞,反正我的口味).
我尝试实现一个“门”包装器,它会限制某些任意 Flow
内允许的元素数量,看起来像:
class LimitingGate[T,U](originalFlow: Flow[T,U],maxInFlight: Int) {
private def in: InputGate[T] = ???
private def out: OutputGate[U] = ???
def gatedFlow: Flow[T,U,NotUsed] = Flow[T].via(in).via(originalFlow).via(out)
}
并在输入/输出门之间使用回调进行节流。
实现部分有效(流终止让我很难过),但感觉是实现实际目标的错误方式。
感谢任何想法/评论/启发性问题
谢谢!
解决方法
按照这些思路尝试一些东西(我只是在脑海中编译它):
def inflightLimit[A,B,M](n: Int,source: Source[T,M])(businessFlow: Flow[T,_])(implicit materializer: Materializer): Source[B,M] = {
require(n > 0) // alternatively,could just result in a Source.empty...
val actorSource = Source.actorRef[Unit](
completionMatcher = PartialFunction.empty,failureMatcher = PartialFunction.empty,bufferSize = 2 * n,overflowStrategy = OverflowStrategy.dropHead // shouldn't matter,but if the buffer fills,the effective limit will be reduced
)
val (flowControl,unitSource) = actorSource.preMaterialize()
source.statefulMapConcat { () =>
var firstElem: Boolean = true
{ a =>
if (firstElem) {
(0 until n).foreach(_ => flowControl.tell(())) // prime the pump on stream materialization
firstElem = false
}
List(a)
}}
.zip(unitSource)
.map(_._1)
.via(businessFlow)
.wireTap { _ => flowControl.tell(()) } // wireTap is Akka Streams 2.6,but can be easily replaced by a map stage which sends () to flowControl and passes through the input
}
基本上:
-
actorSource
会为它收到的每个Unit
发出一个()
(()
,即无意义)元素 -
statefulMapConcat
将导致n
消息仅在流首次启动时发送到actorSource
(从而允许来自源的n
元素通过) -
zip
仅在source
和()
都有可用元素时,才会传递来自actorSource
和source
的一对输入 - 对于每个退出
businessFlow
的元素,都会向actorSource
发送一条消息,这将允许来自源的另一个元素通过
注意事项:
- 这不会以任何方式限制
source
内的缓冲 -
businessFlow
不能删除元素:在删除n
元素后,流将不再处理元素但不会失败;如果需要删除元素,您可以内联businessFlow
并让删除元素的阶段在删除元素时向flowControl
发送消息;还有其他事情可以解决这个问题,您也可以这样做
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。