微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

从Spark数据框中的列中提取数值数据

如何解决从Spark数据框中的列中提取数值数据

我有一个包含20列的数据框,我想用从另一列提取的数据更新一个特定的列(其数据为空),并进行一些格式化。以下是示例输入

+------------------------+----+
|col1                    |col2|
+------------------------+----+
|This_is_111_222_333_test|NULL|
|This_is_111_222_444_test|3296|
|This_is_555_and_666_test|NULL|
|This_is_999_test        |NULL|
+------------------------+----+

我的输出应如下所示

+------------------------+-----------+
|col1                    |col2       |
+------------------------+-----------+
|This_is_111_222_333_test|111,222,333|
|This_is_111_222_444_test|3296       |
|This_is_555_and_666_test|555,666    |
|This_is_999_test        |999        |
+------------------------+-----------+

这是我尝试过的代码,仅当数字连续时才起作用,请您帮我解决一个问题。

df.withColumn("col2",when($"col2".isNull,regexp_replace(regexp_replace(regexp_extract($"col1","([0-9]+_)+",0),"_",","),".$","")).otherwise($"col2")).show(false)

我可以通过创建UDF来做到这一点,但是我正在考虑使用spark内置函数来实现。我的Spark版本是2.2.0

谢谢。

解决方法

在这里,UDF是一个不错的选择。性能类似于OP中提供的withColumn方法的性能(请参见下面的基准),并且即使数字不连续也可以运行,这是OP中提到的问题之一。

import org.apache.spark.sql.functions.udf
import scala.util.Try

def getNums = (c: String) =>  {
    c.split("_").map(n => Try(n.toInt).getOrElse(0)).filter(_ > 0)
}

我重新创建了您的数据,如下所示

val data = Seq(("This_is_111_222_333_test",null.asInstanceOf[Array[Int]]),("This_is_111_222_444_test",Array(3296)),("This_is_555_666_test",("This_is_999_test",null.asInstanceOf[Array[Int]]))
            .toDF("col1","col2")

data.createOrReplaceTempView("data")

注册UDF并在查询中运行

spark.udf.register("getNums",getNums)

spark.sql("""select col1,case when size(col2) > 0 then col2 else getNums(col1) end new_col 
             from data""").show

返回哪个

+--------------------+---------------+
|                col1|        new_col|
+--------------------+---------------+
|This_is_111_222_3...|[111,222,333]|
|This_is_111_222_4...|         [3296]|
|This_is_555_666_test|     [555,666]|
|    This_is_999_test|          [999]|
+--------------------+---------------+

性能已通过如下创建的更大数据集进行了测试:

val bigData = (0 to 1000).map(_ => data union data).reduce( _ union _)
bigData.createOrReplaceTempView("big_data")

因此,将OP中提供的解决方案与UDF解决方案进行了比较,发现大致相同。

// With UDF
spark.sql("""select col1,case when length(col2) > 0 then col2 else getNums(col1) end new_col 
             from big_data""").count

/// OP solution:
bigData.withColumn("col2",when($"col2".isNull,regexp_replace(regexp_replace(regexp_extract($"col1","([0-9]+_)+",0),"_",","),".$","")).otherwise($"col2")).count
,

这是另一种方法,请检查性能。

df.withColumn("col2",expr("coalesce(col2,array_join(filter(split(col1,'_'),x -> CAST(x as INT) IS NOT NULL),','))"))
  .show(false)

+------------------------+-----------+
|col1                    |col2       |
+------------------------+-----------+
|This_is_111_222_333_test|111,333|
|This_is_111_222_444_test|3296       |
|This_is_555_666_test    |555,666    |
|This_is_999_test        |999        |
+------------------------+-----------+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。