微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用pyspark在映射阶段过滤rdd的行

如何解决使用pyspark在映射阶段过滤rdd的行

我正在 pyspark 中使用此代码过滤数据集:

rdd = sc.textFile("location...").map(lambda line: line.split(",")). \
         filter(lambda line :condition...)

我的问题是这样的:在我的解决方案的伪代码中,可以在映射阶段过滤不符合我的条件的行,从而解析整个数据集一次。但是在这种情况下,数据集被解析2倍更贵。

有没有办法通过一个解析来做到这一点?

解决方法

由于您的代码映射部分是在过滤之前完成的,如果您想提供更多优化并且您的映射函数输出不需要过滤,在这种情况下,建议在映射之前进行过滤,这样可以减少映射函数的输入元素个数

映射前过滤

rdd = sc.textFile("location...").filter(lambda line: line.split(",")). \
     map(lambda line :condition...)

另外,如果你想在映射函数中提供一些过滤逻辑,这是可以做到的,但是在这种情况下,你需要在最后过滤 NONE 类型的元素。

words = sc.parallelize(
["my code","java is a required","hadoop is a framework","spark","akka","spark vs hadoop","pyspark","pyspark and spark"]
)

def mapCondition(line):
   if(line.startswith("p")):
       return line

tokenized = words.map(lambda line: mapCondition(line))
print tokenized.collect()
,

您可以在过滤器 lambda 函数中直接对 line.split(",") 进行操作。例如,您可以比较第一个逗号之前的字符串,如下所示:

rdd = sc.textFile("location...").filter(lambda line: line.split(",")[0] = "string")

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。