如何解决(spark-xml) 使用 from_xml 函数解析 xml 列时只接收空值
我正在尝试使用 spark-xml 解析一个非常简单的 XML 字符串列,但即使正确填充了 XML,我也只能接收到 null
值。
我用来解析 xml 的 XSD 是:
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="note">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="from"/>
<xs:element type="xs:string" name="to"/>
<xs:element type="xs:string" name="message"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
虽然列中的 XML 以字符串形式显示如下,但每个标签都正确填充:
<?xml version="1.0" encoding="UTF-8"?>
<note>
<from>Jani</from>
<to>Tove</to>
<message>Remember me this weekend</message>
</note>
我用 Scala 编写的 Spark 代码是这样的:
// XML Schema
val schema = XSDToSchema.read("<the XSD as string>")
// Spark Structured Streaming (N.b. the column value contains the xml as string)
import spark.implicits._
var df = initSource(spark)
.withColumn("parsed",from_xml($"value",schema,Map(
"mode" -> "FAILFAST","nullValue"-> "","rowTag" -> "note","ignoreSurroundingSpaces" -> "true"
)
))
.select($"value",$"parsed.note.from",$"parsed.note.to",$"parsed.note.message")
.writeStream
.format("console")
// .option("mode","FAILFAST")
// .option("nullValue","")
// .option("rowTag","note")
// .option("ignoreSurroundingSpaces","true")
.outputMode("append")
.start()
.awaitTermination(30*1000)
打印此数据帧的模式(在 select 语句之前)将给出预期的模式
root
|-- value: string (nullable = true)
|-- parsed: struct (nullable = true)
| |-- note: struct (nullable = false)
| | |-- from: string (nullable = false)
| | |-- to: string (nullable = false)
| | |-- message: string (nullable = false)
但是在控制台中打印结果时,我得到的只是 null
值,如下所示:
....
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----+----+-------+
| value|from| to|message|
+--------------------+----+----+-------+
|<?xml version="1....|null|null| null|
|<?xml version="1....|null|null| null|
|<?xml version="1....|null|null| null|
....
我不认为它是相关的,但这个 xml 专栏的来源来自阅读定义如下的 Kafka 主题:
def initSource(spark: SparkSession) : DataFrame = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("startingoffsets","earliest")
.option("subscribe","my-test-topic")
.load()
.selectExpr("CAST(value AS STRING)")
}
有没有其他人遇到过这个问题并解决了它?我的选择不多了,我真的很感激这方面的提示:)
我使用的 spark-xml 版本是最新的一个 atm,0.12.0
和 spark 3.1.1
。
更新
我在调用 writeStream
后错误地传递了 spark-xml 选项,而是需要将它们作为 from_xml
函数的第三个参数传递。我仍然只得到空值...
解决方法
最后让我大开眼界的是阅读 spark-xml documentation 中提到的部分:
XSD 文件的路径,用于单独验证每一行的 XML
这意味着模式匹配是通过每一行而不是整个 XML 完成的,在这种情况下,我的示例的模式需要类似于以下内容:
val schema = StructType(Array(
StructField("from",StringType,nullable = true),StructField("to",StructField("message",nullable = true)))
也可以使用 XSD 来完成:
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element type="xs:string" name="from"/>
<xs:element type="xs:string" name="to"/>
<xs:element type="xs:string" name="message"/>
</xs:schema>
这两种声明模式的方法对我有用。希望对以后的人有所帮助。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。