如何解决Akka 流不处理我的 CSV 文件中的所有行
我正在使用 Akka 流来处理包含 1839 行的 CSV 文件。我添加了计数器来计算处理的行数。
这是我的来源,我确保输入文件中的每一行少于 700 个字符。
case class ParsedLine(input: String,field1: String,field2: String,field3: String)
val counter0 = new AtomicInteger()
val counter1 = new AtomicInteger()
val lineSource = FileIO
.fromPath(Paths.get(InputFile))
.via(Framing.delimiter(ByteString("\n"),1024,allowTruncation = true))
.map { l =>
counter0.incrementAndGet()
l.utf8String
}
val parseLine = Flow[String].map { l =>
val words = l.split(",")
ParsedLine(l,words(0),words(1),words(2))
}
这个source的处理方式如下,对应source中的每一行,输出中都应该有一个处理过的行。
val done = lineSource
.via(parseLine)
.to(Sink.foreach(_.input))
.run()
done.onComplete {
case Success(_) =>
println("Counter0 " + counter0.get())
println("Counter1 " + counter1.get())
system.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
system.terminate()
}
有趣的是计数器打印如下&每次我得到不同的数字。如果我删除 .to(Sink.foreach(_.input))
行,我得到的计数为 1839。
Counter0 1445
Counter1 1667
首先我希望 Counter0 的值高于 Counter1,因为 Counter0 在 Counter1 之前进入一个阶段,我希望所有的行都被处理并且计数器应该打印出总行数 1839。
知道在这种情况下发生了什么吗? akka 流是否会在两者之间丢弃物品?
解决方法
您实际上不是在等待流结束。
您正在使用 Sink.foreach(...)
附加 to
,这会删除 Sink.foreach
阶段的处理细节并仅保留较早阶段的处理阶段。
另外,请记住,您在每一步(via
、map
、via
和 to
)都在做同样的事情。因此,您只跟踪由 FileIO.from(...)
创建的第一个图形步骤的处理阶段。这意味着您只是在等待读取完整文件,而不会等待任何后续处理步骤。
您只需要保留两者的结果并等待它们完成即可。
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.both)
val resultFutures: (Future[IOResult],Future[Done]) = stream.run()
val resultsFuture = Future.sequence(List(resultFutures._1,resultFutures._2))
resultsFuture.onComplete {
case Success(List(ioResult,done)) =>
println(ioResult)
println(done)
println(counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
}
或者,您可以选择仅跟踪最后一个处理阶段(在本例中为 Sink.foreach(...)
)
val stream =
lineSource
.via(parseLine)
.toMat(Sink.foreach(_.input))(Keep.right)
val resuleFuture: Future[Done] = stream.run()
resuleFuture.onComplete({
case Success(_) =>
println("Counter0 " + counter0.get())
actorSystem.terminate()
case Failure(e) =>
println(e.getLocalizedMessage)
actorSystem.terminate()
})
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。