微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Akka流实现价值

如何解决Akka流实现价值

我想从流程中引用实现值。下面是代码段,但未编译,错误

type mismatch;
 found   : (akka.NotUsed,scala.concurrent.Future[akka.Done])
 required: (Playground.DomainObj,scala.concurrent.Future[akka.Done])

代码

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import scala.concurrent.Future
import akka.NotUsed
import akka.Done

implicit val actorSystem = ActorSystem("example")

case class DomainObj(name: String,age: Int)

 val customFlow1:Flow[String,DomainObj,NotUsed] = Flow[String].map(s => {
    DomainObj(s,50)
  })

  val customFlow2 = Flow[DomainObj].map(s => {
    s.age + 10
  })

val printAnySink: Sink[Any,Future[Done]] = Sink.foreach(println)

val c1 = Source.single("John").viaMat(customFlow1)(Keep.right).viaMat(customFlow2)(Keep.left).toMat(printAnySink)(Keep.both)

val res: (DomainObj,Future[Done]) = c1.run()

在操场上找到代码https://scastie.scala-lang.org/P9iSx49cQcaOZfKtVCzTPA

我想在流完成后引用DomainObj /

解决方法

Flow[String,DomainObj,NotUsed]的物化值为NotUsed,而不是DomainObj,因此c1的物化值为(NotUsed,Future[Done])

看起来这里的目的是捕获在DomainObj中创建的customFlow1。可以通过

val customFlow1: Flow[String,Future[DomainObj]] =
  Flow[String]
    .map { s => DomainObj(s,50) }
    .alsoTo(Sink.head)

val res: (Future[DomainObj],Future[Done]) = c1.run()

请注意,Sink.head实际上要求customFlow1只能在只能发射一次的物体的下游使用。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。