微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法将解析的 xml 数据插入到具有更改输入架构的 spark 增量表中

如何解决无法将解析的 xml 数据插入到具有更改输入架构的 spark 增量表中

我正在尝试将数据帧中的数据插入到增量表中。最初,我正在解析基于目标模式的 xml 文件并将结果保存到数据帧中。下面是用于解析的代码

def parseAsset (nodeSeqXml: scala.xml.NodeSeq) : Seq[String] = {
  //convert nodeseq to xml
  
 
  
  Seq(  (nodeSeqXml \ "AMS").\@("Pro"),(nodeSeqXml \ "AMS").\@("Prod"),(nodeSeqXml \ "AMS").\@("Asset"),(nodeSeqXml \ "AMS").\@("Descrn"),(nodeSeqXml \ "AMS").\@("Creation_Dt"),(nodeSeqXml \ "AMS").\@("Provider"),(nodeSeqXml \ "AMS").\@("AssetD"),(nodeSeqXml \ "AMS").\@("lass"),(nodeSeqXml \ "AMS").\@("hyu"),((nodeSeqXml \ "App_Data" ).map(d => ((d \\ "@Name").text + "@-" + (d \\ "@Value").text))).mkString("!-"))
}


val AssetXml = XML.loadFile("filepath/filename")
 
val MetadatanodeSeqLst = (AssetXml \\ "Metadata")
var records: Seq[String] = Seq()
 //for each of Metadata tag
MetadatanodeSeqLst.foreach(nodeSeqXml => {
  records = records :+ parseAsset(nodeSeqXml).mkString("%-")
})


val AssetDF = records.toDF("ETY_Asset")

在这一步之后,我将拆分列并分解数组列,最后将数据保存到数据帧中,然后我使用下面的方法将此数据插入到增量表中。

outputparse.write.format("delta").mode("append").option("mergeSchema","true").insertInto("targettable")

如果源文件的列数与目标文件的列数相同,这可以正常工作。但在这种情况下,会有不同模式的不同文件将作为输入传递给解析代码。例如,目标架构有 77 列,如果传入文件有 65 列,并且在将数据插入增量表时,我会收到以下错误

org.apache.spark.sql.AnalysisException: Cannot write to 'target',not enough data columns; target table has 74 column(s) but the inserted data has 65 column(s);

像这样,我得到具有不同输入模式的文件,但我的目标模式是不变的。所以,基本上我需要将 Null 传递给缺失的字段。我知道在将数据写入数据帧之前,我需要在解析代码中进行架构比较。能否请您告诉我如何实现这一点以及在解析代码中的何处合并此逻辑。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。