微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Akka流一对多合并

如何解决使用Akka流一对多合并

我有一个用例,其中有一个要从数据库获取的值的列表和一个需要获取其值的日期的列表。我想使用akka流(带有GraphDSL的流或源)在它们之间建立一对多(或多对一)关系,以便为每个日期获取每个值

例如,

动物=牛,山羊,绵羊

years = 2018,2019

预期的流输出

牛&2018

山羊&2018

绵羊&2018

牛&2019

山羊&2019

绵羊&2019

解决方法

如果您想要这样的产品,则不需要Graph DSL。

def animalsAndYears(animals: Source[Animal,NotUsed],years: Source[Year,NotUsed]): Source[(Animal,Year),NotUsed] =
  years.flatMapConcat { year =>
    animals.map { animal =>
      animal -> year
    }
  }

所以:

 animalsAndYears(Source(listOfAnimals),Source(listOfYears))

将为您提供animalyear元组的流。假设您有一个函数:

 def queryDBForAnimalYear(aandy: (Animal,Year)): Future[Seq[Row]] = ???

然后,您可以使用以下命令获取行流:

val parallelism: Int = ??? // How many queries to have in-flight at a time
animalsAndYears(Source(listOfAnimals),Source(listOfYears))
  .mapAsync(parallelism) { params => queryDBForAnimalYear(params) }
  .mapConcat(identity)  // gives you a Source[Row]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。