我有一个无限的元素流,我想按id和聚合组进行分组,让我们说2秒,然后将它们发送到下游.这是一个不起作用的代码,但可以更好地解释我想要的东西:
Source .tick(0 second,50 millis,() => if (Random.nextBoolean) (1,s"A") else (2,s"B")) .map { f => f() } .groupBy(10,_._1) // how to aggregate grouped elements here for two seconds? .scan(Seq[String]()) { (x,y) => x ++ Seq(y._2) } .to(Sink.foreach(println))
期望的输出应该如下所示:
Seq(A,A,A) Seq(B,B,B) Seq(A,B) // and so on
如何使用流实现此类功能?
解决方法
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。