微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

scala – Spark数据帧中的序列

我在Spark中有数据框.看起来像这样:

+-------+----------+-------+
|  value|     group|     ts|
+-------+----------+-------+
|      A|         X|      1|
|      B|         X|      2|
|      B|         X|      3|
|      D|         X|      4|
|      E|         X|      5|
|      A|         Y|      1|
|      C|         Y|      2|
+-------+----------+-------+

Endgoal:我想找到有多少序列A-B-E(一个序列只是后续行的列表).增加的约束条件是序列的后续部分最多可以相隔n行.让我们考虑这个例子,n是2.

考虑组X.
在这种情况下,B和E之间恰好有1个D(忽略多个连续的B).这意味着B和E相隔1行,因此存在序列A-B-E

我曾考虑使用collect_list(),创建一个字符串(如DNA)并使用带有正则表达式的子字符串搜索.但我想知道是否有更优雅的分布式方式,也许使用窗口函数

编辑:

请注意,提供的数据框只是一个示例.真实的数据帧(以及组)可以是任意长的.

解决方法

编辑回答@Tim的评论修复模式“AABE”

是的,使用窗口函数有帮助,但我创建了一个id来订购:

val df = List(
  (1,"A","X",1),(2,"B",2),(3,3),(4,"D",4),(5,"E",5),(6,"Y",(7,"C",2)
).toDF("id","value","group","ts")

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group).orderBy('id)

然后lag将收集所需的内容,但是需要一个函数生成Column表达式(注意拆分以消除“AABE”的重复计数.警告:这会拒绝“ABAEXX”类型的模式):

def createSeq(m:Int) = split(
  concat(
    (1 to 2*m)
      .map(i => coalesce(lag('value,-i).over(w),lit("")))
  :_*),"A")(0)


val m=2
val tmp = df
  .withColumn("seq",createSeq(m))

+---+-----+-----+---+----+
| id|value|group| ts| seq|
+---+-----+-----+---+----+
|  6|    A|    Y|  1|   C|
|  7|    C|    Y|  2|    |
|  1|    A|    X|  1|BBDE|
|  2|    B|    X|  2| BDE|
|  3|    B|    X|  3|  DE|
|  4|    D|    X|  4|   E|
|  5|    E|    X|  5|    |
+---+-----+-----+---+----+

由于Column API中可用的集合函数很少,因此使用UDF可以更容易地避免使用正则表达式

def patternInSeq(m: Int) = udf((str: String) => {
  var notFound = str
    .split("B")
    .filter(_.contains("E"))
    .filter(_.indexOf("E") <= m)
    .isEmpty
  !notFound
})

val res = tmp
  .filter(('value === "A") && (locate("B",'seq) > 0))
  .filter(locate("B",'seq) <= m && (locate("E",'seq) > 1))
  .filter(patternInSeq(m)('seq))
  .groupBy('group)
  .count
res.show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

泛化(超出范围)

如果你想推广更长的字母序列,那么问题必须推广.这可能是微不足道的,但在这种情况下,应拒绝类型(“ABAE”)的模式(见注释).因此,最简单的推广方法是在下面的实现中使用成对规则(我添加一个组“Z”来说明这个算法的行为)

val df = List(
  (1,( 8,"Z",( 9,(10,(11,(12,5)
).toDF("id","ts")

首先,我们定义一对的逻辑

import org.apache.spark.sql.DataFrame
def createSeq(m:Int) = array((0 to 2*m).map(i => coalesce(lag('value,lit(""))):_*)
def filterPairUdf(m: Int,t: (String,String)) = udf((ar: Array[String]) => {
  val (a,b) = t
  val foundAt = ar
    .dropWhile(_ != a)
    .takeWhile(_ != a)
    .indexOf(b)
  foundAt != -1 && foundAt <= m
})

然后我们定义一个应用这个逻辑的函数迭代地应用于数据帧

def filterSeq(seq: List[String],m: Int)(df: DataFrame): DataFrame = {
  var a = seq(0)
  seq.tail.foldLeft(df){(df: DataFrame,b: String) => {
    val res  = df.filter(filterPairUdf(m,(a,b))('seq))
    a = b
    res
  }}
}

由于我们首先对从第一个字符开始的序列进行过滤,因此获得了简化和优化

val m = 2
val tmp = df
  .filter('value === "A") // reduce problem
  .withColumn("seq",createSeq(m))

scala> tmp.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  6|    A|    Y|  1|   [A,C,]|
|  8|    A|    Z|  1|[A,B,D,E]|
|  1|    A|    X|  1|[A,E]|
+---+-----+-----+---+---------------+

val res = tmp.transform(filterSeq(List("A","E"),m))

scala> res.show()
+---+-----+-----+---+---------------+
| id|value|group| ts|            seq|
+---+-----+-----+---+---------------+
|  1|    A|    X|  1|[A,E]|
+---+-----+-----+---+---------------+

(变换是DataFrame => DataFrame转换的简单糖涂层)

res
  .groupBy('group)
  .count
  .show

+-----+-----+
|group|count|
+-----+-----+
|    X|    1|
+-----+-----+

正如我所说,在扫描序列时有不同的方法来概括“重置规则”,但是这个例子有望帮助实现更复杂的序列.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐