微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark中的迭代过滤器似乎不起作用

如何解决spark中的迭代过滤器似乎不起作用

我正在尝试一个一个删除 RDD 的元素,但这不起作用,因为元素重新出现。

这是我的代码的一部分:

rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0,3]

所以似乎只有最后一个过滤器是“记住”。我在想,在这个循环之后,rdd 会是空的。

然而,我不明白为什么,因为每次我将过滤器获得的新rdd保存在“rdd”中,所以它不应该保留所有转换吗?如果没有,我该怎么办?

谢谢你指出我哪里错了!

解决方法

结果实际上是正确的 - 这不是 Spark 的错误。注意 lambda 函数定义为 x != ii 没有代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像

rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)

由于过滤器都是一样的,都会用最新的i代替,所以每次for循环只过滤掉一项。

为避免这种情况,您可以使用偏函数来确保将 i 替换到函数中:

from functools import partial
 
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd = rdd.filter(partial(lambda x,i: x != i,i))

print(rdd.collect())

或者您可以使用reduce

from functools import reduce

rdd = spark.sparkContext.parallelize([0,2])
rdd = reduce(lambda r,i: r.filter(lambda x: x != i),range(3),rdd)
print(rdd.collect())

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。