如何解决Akka Streams中的“ conditionalZip”运算符
假设我有两个来源:
val first = Source(1 :: 2 :: 4 :: 6 :: Nil)
val second = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: Nil)
是否可以创建一个仅根据条件配对元素的zip?我的意思是:
first.conditionalZip(second,_ == _) // if that method exited
该代码将从first
源中获取元素,并从second
中删除元素,直到有一个满足条件的元素,然后输出一个元组。上述调用的结果为(1,1),(2,2),(4,4),(6,6)
。
解决方法
考虑压缩两个Source,然后使用statefulMapConcat
根据条件函数转换压缩的元素,如下所示:
import akka.stream.scaladsl._
import akka.NotUsed
def popFirstMatch(ls: List[Int],condF: Int => Boolean): (Option[Int],List[Int]) = {
ls.find(condF) match {
case None =>
(None,ls)
case Some(e) =>
val idx = ls.indexOf(e)
if (idx < 0)
(None,ls)
else {
val (l,r) = ls.splitAt(idx)
(r.headOption,l ++ r.tail)
}
}
}
def conditionalZip( first: Source[Int,NotUsed],second: Source[Int,filler: Int,condFcn: (Int,Int) => Boolean ): Source[(Int,Int),NotUsed] = {
first.zipAll(second,filler,filler).statefulMapConcat{ () =>
var prevList = List.empty[Int]
tuple => tuple match { case (e1,e2) =>
if (e2 != filler) {
if (e1 != filler && condFcn(e1,e2))
(e1,e2) :: Nil
else {
if (e1 != filler)
prevList :+= e1
val (opElem,rest) = popFirstMatch(prevList,condFcn(_,e2))
prevList = rest
opElem match {
case None => Nil
case Some(e) => (e,e2) :: Nil
}
}
}
else
Nil
}
}
}
正在测试:
import akka.actor.ActorSystem
implicit val system = ActorSystem("system")
implicit val ec = system.dispatcher
// Example 1:
val first = Source(1 :: 2 :: 4 :: 6 :: Nil)
val second = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: Nil)
conditionalZip(first,second,Int.MinValue,_ == _).runForeach(println)
// (1,1)
// (2,2)
// (4,4)
// (6,6)
conditionalZip(first,_ > _).runForeach(println)
// (4,3)
// (6,4)
conditionalZip(first,_ < _).runForeach(println)
// (1,2)
// (2,3)
// (4,5)
// (6,7)
// Example 2:
val first = Source(3 :: 9 :: 5 :: 5 :: 6 :: Nil)
val second = Source(1 :: 3 :: 5 :: 2 :: 5 :: 6 :: Nil)
conditionalZip(first,_ == _).runForeach(println)
// (3,3)
// (5,5)
// (5,_ > _).runForeach(println)
// (3,1)
// (9,2)
// (6,5)
conditionalZip(first,_ < _).runForeach(println)
// (3,6)
一些注意事项:
-
方法
zipAll
(在Akka Stream2.6
+上可用)压缩两个Source,同时用提供的“ filler”值填充较少的元素。在这种情况下,这些填充剂没有意义,因此应为它们指定与实际元素不同的值。 -
prevList
中使用内部列表statefulMapConcat
来存储来自第一个源的元素,以便在后续迭代中与来自第二个源的元素进行比较。如果源中的元素不同,可以用List
替换Set
,以获得更好的查找性能。 -
方法
popFirstMatch
用于提取prevList
中与提供的部分条件函数匹配的第一个元素,并返回类型为Option的元素的Tuple和其余的List。 -
请注意,这只是
statefulMapConcat
可能是所描述问题的解决方案的说明。如果没有详细实现以涵盖所有情况或完善相当宽泛的条件函数(Int,Int) => Boolean
的范围,示例代码的行为可能不一定符合确切的要求。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。