如何解决从 Scala 中的 StructType 中提取行标记模式以解析嵌套的 XML 更新
我正在尝试使用 spark-xml 库将广泛的嵌套 XML 文件解析为 DataFrame。
这是一个缩写的架构定义 (XSD):
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ItemExport">
<xs:complexType>
<xs:sequence>
<xs:element name="Item">
<xs:complexType>
<xs:sequence>
<xs:element name="ITEM_ID" type="xs:integer" />
<xs:element name="CONTEXT" type="xs:string" />
<xs:element name="TYPE" type="xs:string" />
...
<xs:element name="CLASSIFICATIONS">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="unbounded" name="CLASSIFICATION">
<xs:complexType>
<xs:sequence>
<xs:element name="CLASS_SCHEME" type="xs:string" />
<xs:element name="CLASS_LEVEL" type="xs:string" />
<xs:element name="CLASS_CODE" type="xs:string" />
<xs:element name="CLASS_CODE_NAME" type="xs:string" />
<xs:element name="EFFECTIVE_FROM" type="xs:dateTime" />
<xs:element name="EFFECTIVE_TO" type="xs:dateTime" />
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
包含数据的 XML 文件看起来像这样:
<?xml version="1.0" encoding="utf-8"?>
<ItemExport>
<TIMEZONE>PT</TIMEZONE>
<Item>
<ITEM_ID>56</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
</Item>
...
<Item>
<ITEM_ID>763</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
<CLASSIFICATIONS>
<CLASSIFICATION>
<CLASS_SCHEME>AAU</CLASS_SCHEME>
<CLASS_LEVEL>1</CLASS_LEVEL>
<CLASS_CODE>14</CLASS_CODE>
<CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME>
<EFFECTIVE_FROM />
<EFFECTIVE_TO />
</CLASSIFICATION>
</CLASSIFICATIONS>
</Item>
<ItemExport>
现在,很清楚 RowTag
需要为 Item
,但我遇到了有关 XSD 的问题。行架构封装在文档架构中。
import com.databricks.spark.xml.util.XSDToSchema
import com.databricks.spark.xml._
import java.nio.file.Paths
import org.apache.spark.sql.functions._
val inputFile = "dbfs:/samples/ItemExport.xml"
val schema = XSDToSchema.read(Paths.get("/dbfs/samples/ItemExport.xsd"))
val df1 = spark.read.option("rowTag","Item").xml(inputFile)
val df2 = spark.read.schema(schema).xml(inputFile)
我基本上想获取根元素下Item下的struct
,而不是整个文档架构。
schema.printTreeString
root
|-- ItemExport: struct (nullable = false)
| |-- Item: struct (nullable = false)
| | |-- ITEM_ID: integer (nullable = false)
| | |-- CONTEXT: string (nullable = false)
| | |-- TYPE: string (nullable = false)
...(a few more fields...)
| | |-- CLASSIFICATIONS: struct (nullable = false)
| | | |-- CLASSIFICATION: array (nullable = false)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- CLASS_SCHEME: string (nullable = false)
| | | | | |-- CLASS_LEVEL: string (nullable = false)
| | | | | |-- CLASS_CODE: string (nullable = false)
| | | | | |-- CLASS_CODE_NAME: string (nullable = false)
| | | | | |-- EFFECTIVE_FROM: timestamp (nullable = false)
| | | | | |-- EFFECTIVE_TO: timestamp (nullable = false)
在上面的例子中,使用文档模式解析会产生一个空的 DataFrame:
df2.show()
+-----------+
| ItemExport|
+-----------+
+-----------+
虽然推断的模式基本上是正确的,但它只能在嵌套列存在时推断它们(情况并非总是如此):
df1.show()
+----------+--------------------+----------+---------------+
| ITEM_ID| CONTEXT| TYPE|CLASSIFICATIONS|
+----------+--------------------+----------+---------------+
| 56| Sample | Product| {null}|
| 57| Sample | Product| {null}|
| 59| Part | Component| {null}|
| 60| Part | Component| {null}|
| 61| Sample | Product| {null}|
| 62| Sample | Product| {null}|
| 63| Assembly | Product| {null}|
df1.printSchema
root
|-- ITEM_ID: long (nullable = true)
|-- CONTEXT: string (nullable = false)
|-- TYPE: string (nullable = true)
...
|-- CLASSIFICATIONS: struct (nullable = true)
| |-- CLASSIFICATION: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CLASS_CODE: long (nullable = true)
| | | |-- CLASS_CODE_NAME: string (nullable = true)
| | | |-- CLASS_LEVEL: long (nullable = true)
| | | |-- CLASS_SCHEME: string (nullable = true)
| | | |-- EFFECTIVE_FROM: string (nullable = true)
| | | |-- EFFECTIVE_TO: string (nullable = true)
如 here 和 XML library docs 中所述(“用于单独验证每行 XML 的 XSD 文件的路径”),我可以将给定的行级架构解析为如:
import org.apache.spark.sql.types._
val structschema = StructType(
Array(
StructField("ITEM_ID",IntegerType,false),StructField("CONTEXT",StringType,StructField("TYPE",)
)
val df_struct = spark.read.schema(structschema).option("rowTag","Item").xml(inputFile)
不过,我想从 XSD 获取嵌套列的架构。鉴于 schema
,如何解决这个问题?
版本信息:Scala 2.12
、Spark 3.1.1
、spark-xml 0.12.0
解决方法
XSD 中的列必须或不为空 & XML 文件中的某些列为空以匹配 XSD 和 XML 文件内容,将架构从 nullable=false
更改为 nullable=true
试试下面的代码。
import com.databricks.spark.xml.util.XSDToSchema
import com.databricks.spark.xml._
import java.nio.file.Paths
import org.apache.spark.sql.functions._
val inputFile = "dbfs:/samples/ItemExport.xml"
从 XSD 获取架构,将相同架构应用于空数据帧以获取所需列。
val schema = spark
.createDataFrame(
spark
.sparkContext
.emptyRDD[Row],XSDToSchema
.read(Paths.get("/dbfs/samples/ItemExport.xsd"))
)
.select("ItemExport.Item.*")
.schema
val df2 = spark.read
.option("rootTag","ItemExport")
.option("rowTag","Item")
.schema(setNullable(schema,true)) // To match XSD & XML file content setting all columns are optional i.e nullable=true
.xml(inputFile)
以下函数将更改所有列 optional
或 nullable=true
def setNullable(schema: StructType,nullable:Boolean = false): StructType = {
def recurNullable(schema: StructType): Seq[StructField] =
schema.fields.map{
case StructField(name,dtype: StructType,_,meta) =>
StructField(name,StructType(recurNullable(dtype)),nullable,meta)
case StructField(name,dtype: ArrayType,meta) => dtype.elementType match {
case struct: StructType => StructField(name,ArrayType(StructType(recurNullable(struct)),true),meta)
case other => StructField(name,other,meta)
}
case StructField(name,dtype,meta)
}
StructType(recurNullable(schema))
}
,
很高兴您发现我的 post 有点用! :).
我不确定这是否是您要查找的内容,但我注意到在您的情况下,您还可以让 spark-xml 从 xml 推断架构。
以这个xml为例
<?xml version="1.0" encoding="utf-8"?>
<ItemExport>
<TIMEZONE>PT</TIMEZONE>
<Item>
<ITEM_ID>56</ITEM_ID>
<CONTEXT>Sample</CONTEXT>
<TYPE>Product</TYPE>
</Item>
<Item>
<ITEM_ID>763</ITEM_ID>
<CONTEXT>Sample763</CONTEXT>
<TYPE>Product2</TYPE>
<CLASSIFICATIONS>
<CLASSIFICATION>
<CLASS_SCHEME>AAU</CLASS_SCHEME>
<CLASS_LEVEL>1</CLASS_LEVEL>
<CLASS_CODE>14</CLASS_CODE>
<CLASS_CODE_NAME>BizDev</CLASS_CODE_NAME>
<EFFECTIVE_FROM/>
<EFFECTIVE_TO/>
</CLASSIFICATION>
<CLASSIFICATION>
<CLASS_SCHEME>AXU</CLASS_SCHEME>
<CLASS_LEVEL>2</CLASS_LEVEL>
<CLASS_CODE>16</CLASS_CODE>
<CLASS_CODE_NAME>BizProd</CLASS_CODE_NAME>
<EFFECTIVE_FROM/>
<EFFECTIVE_TO/>
</CLASSIFICATION>
</CLASSIFICATIONS>
</Item>
</ItemExport>
还有这个火花代码片段,
var df = spark.read
.option("mode","FAILFAST")
.option("nullValue","")
.option("rootTag","ItemExport")
.option("rowTag","Item")
.option("ignoreSurroundingSpaces","true")
// .schema(schema)
.xml("pathTo/testing.xml")
.selectExpr("*")
df.printSchema()
df.show()
我得到以下架构:
|-- CLASSIFICATIONS: struct (nullable = true)
| |-- CLASSIFICATION: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- CLASS_CODE: long (nullable = true)
| | | |-- CLASS_CODE_NAME: string (nullable = true)
| | | |-- CLASS_LEVEL: long (nullable = true)
| | | |-- CLASS_SCHEME: string (nullable = true)
| | | |-- EFFECTIVE_FROM: string (nullable = true)
| | | |-- EFFECTIVE_TO: string (nullable = true)
|-- CONTEXT: string (nullable = true)
|-- ITEM_ID: long (nullable = true)
|-- TYPE: string (nullable = true)
它似乎也适用于以下 XSD:
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="ITEM_ID" type="xs:integer"/>
<xs:element name="CONTEXT" type="xs:string"/>
<xs:element name="TYPE" type="xs:string"/>
<xs:element minOccurs="0" name="CLASSIFICATIONS">
<xs:complexType>
<xs:sequence>
<xs:element maxOccurs="unbounded" name="CLASSIFICATION">
<xs:complexType>
<xs:sequence>
<xs:element name="CLASS_SCHEME" type="xs:string"/>
<xs:element name="CLASS_LEVEL" type="xs:string"/>
<xs:element name="CLASS_CODE" type="xs:string"/>
<xs:element name="CLASS_CODE_NAME" type="xs:string"/>
<xs:element minOccurs="0" name="EFFECTIVE_FROM" type="xs:dateTime"/>
<xs:element minOccurs="0" name="EFFECTIVE_TO" type="xs:dateTime"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
如果您想将嵌套的 CLASSIFICATION 行作为数据帧中的实际行,您似乎可以选择使用 explode_outer
函数(不确定性能/内存使用对此的影响)
因此,您可以执行以下操作:
// Starting transformation
import spark.implicits._
import org.apache.spark.sql.functions.explode_outer
var df = spark.read
.option("mode","true")
.schema(schema) // notice I'm using the XSD this time :)
.xml("pathTo/testing.xml")
.select($"ITEM_ID",$"CONTEXT",$"TYPE",explode_outer($"CLASSIFICATIONS.CLASSIFICATION"))
.select($"ITEM_ID",$"col.CLASS_SCHEME",$"col.CLASS_LEVEL",$"col.CLASS_CODE",$"col.CLASS_CODE_NAME",$"col.EFFECTIVE_FROM",$"col.EFFECTIVE_TO")
df.printSchema()
df.show()
在这种情况下,我的 DataFrame 显示以下结果
+-------+---------+--------+------------+-----------+----------+---------------+--------------+------------+
|ITEM_ID| CONTEXT| TYPE|CLASS_SCHEME|CLASS_LEVEL|CLASS_CODE|CLASS_CODE_NAME|EFFECTIVE_FROM|EFFECTIVE_TO|
+-------+---------+--------+------------+-----------+----------+---------------+--------------+------------+
| 56| Sample| Product| null| null| null| null| null| null|
| 763|Sample763|Product2| AAU| 1| 14| BizDev| null| null|
| 763|Sample763|Product2| AXU| 2| 16| BizProd| null| null|
+-------+---------+--------+------------+-----------+----------+---------------+--------------+------------+
我希望这对您的用例有所帮助。
更新
我修改了 XSD,minOccurs="0" 使参数可选,仅在根据您作为示例提供的 XML 似乎缺少的字段中才需要,这些是 (CLASSIFICATIONS,EFFECTIVE_FROM,EFFECTIVE_TO)>
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。