微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Akka流按来源单积累

如何解决Akka流按来源单积累

我正在尝试使用akka流来积累数据并用作批处理:

val myFlow: Flow[String,Unit,NotUsed] = Flow[String].collect {
    case record =>
      println(record)
      Future(record)
  }.mapAsync(1)(x => x).groupedWithin(3,30 seconds)
    .mapAsync(10)(records =>
      someBatchOperation(records))
    )

我对以上代码的期望是直到3条记录准备就绪或30秒过去才进行任何操作。但是,当我使用Source.single("test")发送一些请求时,它正在处理此记录,而无需等待其他人或30秒。

如何使用此流程等待其他记录到来或闲置30秒?

记录是一个API请求一个一个地发出的,而我试图像这样在流中累积这些数据:

Source.single(apiRecord).via(myFlow).runWith(Sink.ignore)

解决方法

它实际上是这样做的。让我们考虑以下内容:

Source(Stream.from(1)).throttle(1,400 milli).groupedWithin(3,1 seconds).runWith(Sink.foreach(i => println(s"Done with ${i} ${System.currentTimeMillis}")))

在我杀死进程之前,该行的输出是:

Done with Vector(1,2,3) 1599495716345
Done with Vector(4,5) 1599495717348
Done with Vector(6,7,8) 1599495718330
Done with Vector(9,10) 1599495719350
Done with Vector(11,12,13) 1599495720330
Done with Vector(14,15) 1599495721350
Done with Vector(16,17,18) 1599495722328
Done with Vector(19,20) 1599495723348
Done with Vector(21,22,23) 1599495724330

我们可以看到,每次发射2个元素到3个元素之间的时间差都超过1秒。这是有道理的,因为经过1秒钟的延迟后,到达打印线要花更多的时间。

我们每次发射2个元素到3个元素之间的差小于一秒。因为它有足够的元素继续下去。

为什么在您的示例中它不起作用?

使用Source.single时,源会为其自身添加一个完整的阶段。您可以在source code of akka中看到它。 在这种情况下,groupedWithin流知道不会再有任何元素,因此可以发出"test"字符串。为了实际测试此流,请尝试创建更大的流。

使用Source(1到10)时,它实际上转换为Source.Single,这也完成了流。我们可以看到here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。