微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用值序列 spark 过滤来自 RDD 的数据

如何解决使用值序列 spark 过滤来自 RDD 的数据

我需要有关以下用例的帮助:

问题 1:我的 RDD 格式如下。现在从这个 RDD 中,我想排除 airport.code in("PUN","HAR","KAS")

    case class airport(code:String,city:String,airportname:String)
    val airportRdd=sparkSession.sparkContext.textFile("src/main/resources/airport_data.csv").
          map(x=>x.split(","))
    val airPortRddTransformed=airportRdd.map(x=>airport(x(0),x(1),x(2)))
    val trasnformedRdd=airPortRddTransformed.filter(air=>!(air.code.contains(seqValues:_*)))
  

但是!不起作用。它告诉我无法解析符号!。有人可以帮助我。如何在 RDD 中进行否定。我只能使用 RDD 方法

还有一个问题:

问题 2: 数据文件有 70 列。我有一个列序列:

val seqColumns=List("lat","longi","height","country") 

我想在加载 RDD 时排除这些列。我该怎么做。我的生产 RDD 有 70 列,我只知道要排除的列名。不是每列的索引。再次在RDD 方法。我知道如何在 Dataframe 方法中做到这一点。

解决方法

问题 1

使用 broadcast 将值列表传递给 filter 函数。过滤器中的 _* 似乎不起作用。我将条件更改为 !seqValues.value.contains(air.code)

数据样本:airport_data.csv

C001,Pune,Pune Airport
C002,Mumbai,Chhatrapati Shivaji Maharaj International Airport
C003,New York,New York Airport
C004,Delhi,Delhi Airport

代码片段

case class airport(code:String,city:String,airportname:String)
val seqValues=spark.sparkContext.broadcast(List("C001","C003"))
val airportRdd = spark.sparkContext.textFile("D:\\DataAnalysis\\airport_data.csv").map(x=>x.split(","))
val airPortRddTransformed = airportRdd.map(x=>airport(x(0),x(1),x(2)))
//airPortRddTransformed.foreach(println)
val trasnformedRdd = airPortRddTransformed.filter(air => !seqValues.value.contains(air.code))
trasnformedRdd.foreach(println)

输出 ->

airport(C002,Chhatrapati Shivaji Maharaj International Airport)
airport(C004,Delhi Airport)
,

我会改变的事情:

1- 您正在将 .csv 作为 TextFile 读取,然后根据 , 拆分行。您可以通过阅读文件来保存这一步:

val df = spark.read.csv("src/main/resources/airport_data.csv")

2- 更改 contains

的顺序
val trasnformedRdd = airPortRddTransformed.filter(air => !(seqValues.contains(air.code)))

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。