微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

scala – 如何将Akka Streams Merge的输出传输到另一个流?

我正在玩Akka Streams,并且已经找到了大部分的基础知识,但我不清楚如何获取Merge的结果并对其进行进一步的操作(map,filter,fold等).

我想修改下面的代码,以便我可以进一步操作数据,而不是将合并传递给接收器.

implicit val materializer = FlowMaterializer()

val items_a = Source(List(10,20,30,40,50))
val items_b = Source(List(60,70,80,90,100))
val sink = ForeachSink(println)

val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> sink
}.run()

我想我的主要问题是我无法弄清楚如何创建一个没有源代码的流组件,而且我无法弄清楚如何在不使用特殊的Merge对象和〜>的情况下进行合并.句法.

编辑:这个问题和答案是和Akka Streams 0.11一起使用的

解决方法

如果你不关心Merge的语义,其中元素随机地下游,那么你可以尝试在Source上连接,而不是:

items_a.concat(items_b).map(_ * 2).map(_.toString).foreach(println)

这里的不同之处在于,a中的所有项目将首先在b的任何元素之前向下游流动.如果你真的需要Merge的行为,那么你可以考虑以下内容(请记住,你最终需要一个接收器,但你可以在合并后进行额外的转换):

val items_a = Source(List(10,100))

val sink = ForeachSink[Double](println)
val transform = Flow[Int].map(_ * 2).map(_.todouble).to(sink)


val materialized = FlowGraph { implicit builder =>
  import FlowGraphImplicits._
  val merge = Merge[Int]("m1")
  items_a ~> merge
  items_b ~> merge ~> transform
}.run

在此示例中,您可以看到我使用Flow随播中的帮助程序来创建没有特定输入源的Flow.然后我可以将它附加到合并点以获得我的额外处理.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐