如何解决如何将带有动态键的json结构转换为数组
我试图解析spark中的json数据,并发现当某些子文档中包含动态键时很难解析。 之前已经问过这个问题,并且也提供了我的答案。 How to parse dynamic Json with dynamic keys inside it in Scala 但是我解决的方法不好,并且当动态子文档的数量增加时,我必须为每个子文档编写循环。我附上了可运行的代码,如果有人可以指导我,那将是一个很大的帮助。
import org.apache.spark.sql.{DataFrame,SparkSession}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{ArrayType,StructType}
object Stackoverflow {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir","C:\\hadoop")
val spark = SparkSession
.builder
.appName("ConfluentConsumer")
.master("local[2]")
.getOrCreate()
import spark.implicits._
val jsonStringDs = spark.createDataset[String](
Seq(
("{\n\t\"advertisers\": {\n\t\t\"5e8f445b4d9c004170771a34\": {\n\t\t\t\"_id\": {\n\t\t\t\t\"oid\": \"5e8f445b4d9c004170771a34\"\n\t\t\t},\n\t\t\t\"name\": \"33389_Advertiser001\"\n\t\t},\n\t\t\"5e8f445b4d9c004170771a35\": {\n\t\t\t\"_id\": {\n\t\t\t\t\"oid\": \"5e8f445b4d9c004170771a35\"\n\t\t\t},\n\t\t\t\"name\": \"33389_Advertiser002\"\n\t\t}\n\t}\n}")
))
import spark.implicits._
val df = spark.read.json(jsonStringDs)
def flattenDataframe(df: DataFrame): DataFrame = {
val fields = df.schema.fields
val fieldNames = fields.map(x => x.name)
val length = fields.length
for(i <- 0 to fields.length-1){
val field = fields(i)
val fieldtype = field.dataType
val fieldName = field.name
fieldtype match {
case arrayType: ArrayType =>
val fieldNamesExcludingArray = fieldNames.filter(_!=fieldName)
val fieldNamesExcludingArray1:Array[String]= fieldNamesExcludingArray
.map(x=>"`"+col(x)+"`".toString())
val fieldNamesAndExplode =(fieldNamesExcludingArray1 ++ Array(s"explode_outer($fieldName) as $fieldName"))
val explodedDf=df.selectExpr(fieldNamesAndExplode:_*)
return flattenDataframe(explodedDf)
case structType: StructType =>
val childFieldnames = structType.fieldNames.map(childname => fieldName +"."+childname)
val newfieldNames = fieldNames.filter(_!= fieldName) ++ childFieldnames
val renamedcols = newfieldNames.map(x => (col(x.toString()).as(x.toString().replace(".","_"))))
val explodedf = df.select(renamedcols:_*)
return flattenDataframe(explodedf)
case _ =>
}
}
df
}
val flattendedJSON = flattenDataframe(df)
flattendedJSON.show(10,false)
}
}
Json输入
{
"advertisers": {
"5e8f445b4d9c004170771a34": {
"_id": {
"oid": "5e8f445b4d9c004170771a34"
},"name": "33389_Advertiser001"
},"5e8f445b4d9c004170771a35": {
"_id": {
"oid": "5e8f445b4d9c004170771a35"
},"name": "33389_Advertiser002"
}
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。