如何解决计算在有异步和没有异步的情况下在 akka 流中完成流所需的时间
我想计算 akka 流完成所需的时间 object Demo 扩展 App {
implicit val system = ActorSystem("MyDemo")
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis
System.out.println(elapsedtime)
val flowA = Flow[String].map { element ⇒
println(s"Flow A : $element ${Thread.currentThread().getName()}" )
Thread.sleep(1000)
element
}
val flowB = Flow[String].map { element ⇒
println(s"Flow B : $element ${Thread.currentThread().getName()}" )
Thread.sleep(1000)
element
}
val flowC = Flow[String].map { element ⇒
println(s"Flow C : $element ${Thread.currentThread().getName()}" )
Thread.sleep(1000)
element
}
import system.dispatcher
val completion = Source(List("Java","Scala","C++"))
.via(flowA)
.via(flowB)
.via(flowC)
.runWith(Sink.foreach(s ⇒ println("Got output " + s)))
val stopTime = System.currentTimeMillis
val elapsedtime = stopTime - startTime
println(elapsedtime)
completion.onComplete(_ => system.terminate())
0
113
Flow A : Java MyDemo-akka.actor.default-dispatcher-4
Flow B : Java MyDemo-akka.actor.default-dispatcher-4
Flow C : Java MyDemo-akka.actor.default-dispatcher-4
Got output Java
Flow A : Scala MyDemo-akka.actor.default-dispatcher-4
Flow B : Scala MyDemo-akka.actor.default-dispatcher-4
Flow C : Scala MyDemo-akka.actor.default-dispatcher-4
Got output Scala
Flow A : C++ MyDemo-akka.actor.default-dispatcher-4
Flow B : C++ MyDemo-akka.actor.default-dispatcher-4
Flow C : C++ MyDemo-akka.actor.default-dispatcher-4
Got output C++
- 在流完成之前打印经过的时间
113
,不清楚原因。我想在流完成处理后打印经过的时间 - 我们如何计算完成流处理所需的时间,因为我想比较使用
.map
与将.map
替换为.async
所花费的时间的结果
解决方法
运行流是异步的。对于
val completion =
// omitted for brevity
.runWith(Sink.foreach(s => println(s"Got output $s")))
completion
是一个 Future[Done]
(Sink.foreach
的物化值),当流成功完成(未来将是如果流失败则失败)。这行代码实际上是完整的,一旦流被具体化并开始执行,就会继续执行。
您只需将计算经过时间的代码移动到 Done
上的 onComplete
回调中,即可获得所用时间的上限。
completion
请注意,此回调将在流完成后的某个时刻执行,但不能保证它会立即执行。也就是说,只要您运行它的系统和 JVM 的负载不高,就足够了。
另外两点值得注意:
-
completion.onComplete { _ => // there's only one possible value here,so we don't need it val stopTime = System.currentTimeMillis() val elapsedTime = stopTime - startTime println(elapsedTime) system.terminate() }
真的不应该用于可靠的基准测试:它甚至不能保证是单调的(它可以倒退)。为此,currentTimeMillis
通常更可靠。 - 在
System.nanoTime
之前获取开始时间可能更现实,否则您还要测量构建流的“蓝图”的时间,而不仅仅是实现和运行流的时间。
我试着建立一个图表来衡量流动时间,也许它可以帮助你。
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow,GraphDSL,Source,Unzip,Zip}
object TimedFlow {
def apply[In,Out](innerFlow: Flow[In,Out,NotUsed],func: (Long,Long) => Any): Flow[In,NotUsed] = {
val flowWithLong = Flow.fromGraph(GraphDSL.create() {
implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val unzip = builder.add(Unzip[In,Long]())
val zip = builder.add(Zip[Out,Long]())
unzip.out0 ~> innerFlow ~> zip.in0
unzip.out1 ~> zip.in1
FlowShape(unzip.in,zip.out)
})
Flow[In]
.map(in => (in,System.currentTimeMillis()))
.via(flowWithLong)
.via(Flow[(Out,Long)].map {
case (out,beginTime) =>
val endTime = System.currentTimeMillis()
func(beginTime,endTime)
out
})
}
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("QuickStart")
val source: Source[Int,NotUsed] = Source(1 to 100)
implicit val ec = system.dispatcher
val plusOneFlowWithTimePrint = TimedFlow(plusOneFlow(),(beginTime: Long,endTime: Long) => {
println(s"begin ${beginTime} end ${endTime}")
println(s"end - begin: ${endTime - beginTime}")
})
val done = source.via(plusOneFlowWithTimePrint).runForeach(println)
done.onComplete(_ => system.terminate())
}
def plusOneFlow(): Flow[Int,Int,NotUsed] = {
Flow[Int]
.map {
x =>
Thread.sleep(50)
x + 1
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。