微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用python或Scala将复杂的SQL查询转换为spark-dataframe

如何解决如何使用python或Scala将复杂的SQL查询转换为spark-dataframe

我已经在spark中使用sqlcontext完成了一次转换,但是我只想使用Spark Data frame编写相同的查询。该查询包括联接操作以及sql的case语句。 如下所示的SQL查询

refereshLandingData=spark.sql( "select a.Sale_ID,a.Product_ID,"
                           "CASE "
                           "WHEN (a.Quantity_Sold IS NULL) THEN b.Quantity_Sold "
                           "ELSE a.Quantity_Sold "
                           "END AS Quantity_Sold,"
                           "CASE "
                           "WHEN (a.vendor_ID IS NULL) THEN b.vendor_ID "
                           "ELSE a.vendor_ID "
                           "END AS vendor_ID,"
                           "a.Sale_Date,a.Sale_Amount,a.Sale_Currency "
                           "from landingData a left outer join preHoldData b on a.Sale_ID = b.Sale_ID" )

现在我想在scala和python中的spark数据框中使用等效代码。我尝试了一些代码,但是它
不起作用。我尝试的代码如下:

joinDf=landingData.join(preHoldData,landingData['Sale_ID']==preHoldData['Sale_ID'],'left_outer')

joinDf.withColumn\
('QuantitySold',pf.when(pf.col(landingData('Quantity_Sold')).isNull(),pf.col(preHoldData('Quantity_Sold')))
.otherwise(pf.when(pf.col(preHoldData('Quantity_Sold')).isNull())),pf.col(landingData('Quantity_Sold'))).show()

在以上代码中,连接完美完成,但大小写条件不起作用。 我得到-> TypeError:'DataFrame'对象不可调用 我正在使用spark 2.3.2版本和python 3.7,以及在spark-scala情况下类似地使用scala 2.11 请任何人建议我任何等效的代码或指导!

解决方法

这是一个scala解决方案: 假设landingDatapreHoldData是您的数据帧


 val landingDataDf = landingData.withColumnRenamed("Quantity_Sold","Quantity_Sold_ld")
 val preHoldDataDf = preHoldData.withColumnRenamed("Quantity_Sold","Quantity_Sold_phd")

 val joinDf = landingDataDf.join(preHoldDataDf,Seq("Sale_ID"))


 joinDf
 .withColumn("Quantity_Sold",when(col("Quantity_Sold_ld").isNull,col("Quantity_Sold_phd")).otherwise(col("Quantity_Sold_ld"))
 ). drop("Quantity_Sold_ld","Quantity_Sold_phd")

您可以使用相同的方式处理Vendor_id

代码的问题是,您无法在withColumn操作中引用其他/旧的数据框名称。它必须来自您正在操作的数据框。

,

下面的代码将在scala上运行,而对于python,您可能会稍作调整。

val preHoldData = spark.table("preHoldData").alias("a")
val landingData = spark.table("landingData").alias("b")

landingData.join(preHoldData,Seq("Sale_ID"),"leftouter")
.withColumn("Quantity_Sold",when(col("a.Quantity_Sold").isNull,col("b.Quantity_Sold")).otherwise(col("a.Quantity_Sold")))
.withColumn("Vendor_ID",when(col("a.Vendor_ID").isNull,col("b.Vendor_ID")).otherwise(col("a.Vendor_ID")))
.select(col("a.Sale_ID"),col("a.Product_ID"),col("Quantity_Sold"),col("Vendor_ID"),col("a.Sale_Date"),col("a.Sale_Amount"),col("a.Sale_Currency"))

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。