如何解决使用变量Schema从dataFrame列读取JSON
我有一个具有一个列的数据框,其中包含一些嵌套的JSON以及变量schema。即每行中的JSON具有不同的架构。
例如
Key Value
1 {"foo":"bar"}
2 {"key1":"val1","key2":"val2"}
我需要对此进行解析,并创建一个最终的数据框,其中包含根据JSON模式组合的所有列,这些列由所有行及其各自的值组成,如下所示。
Key foo key1 key2
1 bar null null
2 null val1 val2
解决方法
GEMRC
输出:
val data = Seq((1,"""{"foo":"bar"}"""),(2,"""{"key1":"val1","key2":"val2"}"""),(3,"key3":"val3","key4": "val4"}"""))
val df = spark.createDataFrame(
data
).toDF("num","keyvalue")
df.show()
将keyvalue json对象中的值转换为scala映射对象。让我们称它为mapd_df
+---+---------------------------------------------+
|num|keyvalue |
+---+---------------------------------------------+
|1 |{"foo":"bar"} |
|2 |{"key1":"val1","key2":"val2"} |
|3 |{"key1":"val1","key4": "val4"}|
+---+---------------------------------------------+
输出(mapped_df):-
import scala.util.parsing.json._
import org.apache.spark.sql.functions.{col,udf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import spark.implicits._
val stringToMap = udf((str: String) => JSON.parseFull(str).get.asInstanceOf[Map[String,String]])
val mapped_df =df.withColumn("mapped",stringToMap(col("keyvalue")))
mapped_df.show(false)
通过收集上方映射列中的所有唯一键来创建新的数据框架构
+---+---------------------------------------------+------------------------------------------+
|num|keyvalue |mapped |
+---+---------------------------------------------+------------------------------------------+
|1 |{"foo":"bar"} |[foo -> bar] |
|2 |{"key1":"val1","key2":"val2"} |[key1 -> val1,key2 -> val2] |
|3 |{"key1":"val1","key4": "val4"}|[key1 -> val1,key3 -> val3,key4 -> val4]|
+---+---------------------------------------------+------------------------------------------+
输出-
var schema = List(StructField("number",IntegerType))
val col_rdd = mapped_df.select(col("mapped")).rdd.map(x => {
val maps: Map[String,String] = x.getAs[Map[String,String]]("mapped")
val m = maps.map(x => x._1)
m
})
val schem = col_rdd.flatMap(x => x).collect().sorted.toSet
val new_schema = schem.toList.map(x => StructField(x,StringType,true))
schema = schema ++ new_schema
现在,我们创建了Schema。将mapping_df转换为rdd并对其进行以下操作,使其与我们的新架构一致:
schema: List[org.apache.spark.sql.types.StructField] = List(
StructField(number,IntegerType,true),StructField(key4,StructField(key1,StructField(key2,StructField(key3,StructField(foo,true))
新的Dtaframe与给定的数据帧
val df_rdd = mapped_df.rdd.map(row => {
val num = List(row.getAs[Int]("num"))
val map_val: Map[String,String] = row.getAs[Map[String,String]]("mapped")
val new_cols = schem.toList.map(x => map_val.getOrElse(x,null))
Row.fromSeq(num ++ new_cols)
})
val new_dataframe = spark.createDataFrame(df_rdd,StructType(schema))
new_dataframe.show(false)
谢谢!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。