微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Akka Streams中的“ conditionalZip”运算符

如何解决Akka Streams中的“ conditionalZip”运算符

假设我有两个来源:

val first = Source(1 :: 2 :: 4 :: 6 :: Nil)
val second = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: Nil)

是否可以创建一个仅根据条件配对元素的zip?我的意思是:

first.conditionalZip(second,_ == _) // if that method exited

代码将从first源中获取元素,并从second删除元素,直到有一个满足条件的元素,然后输出一个元组。上述调用的结果为(1,1),(2,2),(4,4),(6,6)

解决方法

考虑压缩两个Source,然后使用statefulMapConcat根据条件函数转换压缩的元素,如下所示:

import akka.stream.scaladsl._
import akka.NotUsed

def popFirstMatch(ls: List[Int],condF: Int => Boolean): (Option[Int],List[Int]) = {
  ls.find(condF) match {
    case None =>
      (None,ls)
    case Some(e) => 
      val idx = ls.indexOf(e)
      if (idx < 0)
        (None,ls)
      else {
        val (l,r) = ls.splitAt(idx)
        (r.headOption,l ++ r.tail)
      }
  }
}

def conditionalZip( first: Source[Int,NotUsed],second: Source[Int,filler: Int,condFcn: (Int,Int) => Boolean ): Source[(Int,Int),NotUsed] = {
    first.zipAll(second,filler,filler).statefulMapConcat{ () =>
      var prevList = List.empty[Int]
      tuple => tuple match { case (e1,e2) =>
        if (e2 != filler) {
          if (e1 != filler && condFcn(e1,e2))
            (e1,e2) :: Nil
          else {
            if (e1 != filler)
              prevList :+= e1
            val (opElem,rest) = popFirstMatch(prevList,condFcn(_,e2))
            prevList = rest
            opElem match {
              case None => Nil
              case Some(e) => (e,e2) :: Nil
            }
          }
        }
        else
          Nil
      }
    }
}

正在测试:

import akka.actor.ActorSystem
implicit val system = ActorSystem("system")
implicit val ec = system.dispatcher

// Example 1:
val first = Source(1 :: 2 :: 4 :: 6 :: Nil)
val second = Source(1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: Nil)

conditionalZip(first,second,Int.MinValue,_ == _).runForeach(println) 
// (1,1)
// (2,2)
// (4,4)
// (6,6)

conditionalZip(first,_ > _).runForeach(println) 
// (4,3)
// (6,4)

conditionalZip(first,_ < _).runForeach(println) 
// (1,2)
// (2,3)
// (4,5)
// (6,7)

// Example 2:
val first = Source(3 :: 9 :: 5 :: 5 :: 6 :: Nil)
val second = Source(1 :: 3 :: 5 :: 2 :: 5 :: 6 :: Nil)

conditionalZip(first,_ == _).runForeach(println)
// (3,3)
// (5,5)
// (5,_ > _).runForeach(println)
// (3,1)
// (9,2)
// (6,5)

conditionalZip(first,_ < _).runForeach(println)
// (3,6)

一些注意事项:

  1. 方法zipAll(在Akka Stream 2.6 +上可用)压缩两个Source,同时用提供的“ filler”值填充较少的元素。在这种情况下,这些填充剂没有意义,因此应为它们指定与实际元素不同的值。

  2. prevList中使用内部列表statefulMapConcat来存储来自第一个源的元素,以便在后续迭代中与来自第二个源的元素进行比较。如果源中的元素不同,可以用List替换Set,以获得更好的查找性能。

  3. 方法popFirstMatch用于提取prevList中与提供的部分条件函数匹配的第一个元素,并返回类型为Option的元素的Tuple和其余的List。

  4. 请注意,这只是statefulMapConcat可能是所描述问题的解决方案的说明。如果没有详细实现以涵盖所有情况或完善相当宽泛的条件函数(Int,Int) => Boolean的范围,示例代码的行为可能不一定符合确切的要求。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。