如何解决使用Akka流一对多合并
我有一个用例,其中有一个要从数据库中获取的值的列表和一个需要获取其值的日期的列表。我想使用akka流(带有GraphDSL的流或源)在它们之间建立一对多(或多对一)关系,以便为每个日期获取每个值
例如,
动物=牛,山羊,绵羊
years = 2018,2019
预期的流输出为
牛&2018
山羊&2018
绵羊&2018
牛&2019
山羊&2019
绵羊&2019
解决方法
如果您想要这样的产品,则不需要Graph DSL。
def animalsAndYears(animals: Source[Animal,NotUsed],years: Source[Year,NotUsed]): Source[(Animal,Year),NotUsed] =
years.flatMapConcat { year =>
animals.map { animal =>
animal -> year
}
}
所以:
animalsAndYears(Source(listOfAnimals),Source(listOfYears))
将为您提供animal
,year
元组的流。假设您有一个函数:
def queryDBForAnimalYear(aandy: (Animal,Year)): Future[Seq[Row]] = ???
然后,您可以使用以下命令获取行流:
val parallelism: Int = ??? // How many queries to have in-flight at a time
animalsAndYears(Source(listOfAnimals),Source(listOfYears))
.mapAsync(parallelism) { params => queryDBForAnimalYear(params) }
.mapConcat(identity) // gives you a Source[Row]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。