如何解决如何在 Akka 中将刻度源与内部源流相结合
我希望创建一个调度程序,它为每个给定的时间段打勾并从多个表中传输数据。如果下游接收器变慢,我希望自动收报机也变慢。有了它,我可以提供背压。
所以总的来说,我结合了一个解决方案,但我结束了 Source[Source[_]]
,因为外部 Source 是独立于内部 Source 性能的代码。
这是一个简化版本的代码,我创建了 Slick 源来从数据库中获取数据;
val source = Source.tick(initialDelay.second,1.second,"ss")
.map(_ => getSegments)
.mapConcat { case segments =>
segments.map { s =>
val newUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_data".as[Int]).map(id => (id,s.newData))
val allUsers = Slick.source(sql"SELECT hguser_id FROM sys_app_${s.segment.appId}_someotherdata".as[Int]).map(id => (id,s.wholeSegment))
Source.combine(newUsers,allUsers)(Merge(_))
}
}
最终我想从多个表中流式传输数据。
谢谢。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。