我需要做一些与此
https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala非常相似的事情
我的问题是我有一个未知数量的组,如果mapAsync的并行数少于我得到的组数和最后一个数组中的错误
Tearing down
SynchronousFileSink(/Users/sam/dev/projects/akka-streams/target/log-ERROR.txt)
due to upstream error
(akka.stream.impl.StreamSubscriptionTimeoutSupport$$anon$2)
我试图在akka streams http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html的模式指南中建议使用缓冲区
groupBy { case LoglevelPattern(level) => level case other => "OTHER" }.buffer(1000,OverflowStrategy.backpressure). // write lines of each group to a separate file mapAsync(parallelism = 2) {....
但结果相同
解决方法
扩展jrudolph的评论是完全正确的……
在这个实例中,您不需要mapAsync.作为一个基本示例,假设您有元组的来源
import akka.stream.scaladsl.{Source,Sink} def data() = List(("foo",1),("foo",2),("bar",3),2)) val originalSource = Source(data)
然后,您可以执行groupBy以创建Source of Sources
def getID(tuple : (String,Int)) = tuple._1 //a Source of (String,Source[(String,Int),_]) val groupedSource = originalSource groupBy getID
每个分组的源可以仅与地图并行处理,不需要任何花哨的东西.以下是每个分组在独立流中求和的示例:
import akka.actor.ActorSystem import akka.stream.ACtorMaterializer implicit val actorSystem = ActorSystem() implicit val mat = ActorMaterializer() import actorSystem.dispatcher def getValues(tuple : (String,Int)) = tuple._2 //does not have to be a def,we can re-use the same sink over-and-over val sumSink = Sink.fold[Int,Int](0)(_ + _) //a Source of (String,Future[Int]) val sumSource = groupedSource map { case (id,src) => id -> {src map getValues runWith sumSink} //calculate sum in independent stream }
现在所有的“foo”数字都与所有“bar”数字并行求和.
当你有一个返回Future [T]的封装函数并且你试图发出一个T时,会使用mapAsync;在你的问题中并非如此.此外,mapAsync涉及waiting for results,而不是reactive ……
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。