如何解决spark中的迭代过滤器似乎不起作用
我正在尝试一个一个地删除 RDD 的元素,但这不起作用,因为元素重新出现。
这是我的代码的一部分:
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0,3]
所以似乎只有最后一个过滤器是“记住”。我在想,在这个循环之后,rdd 会是空的。
然而,我不明白为什么,因为每次我将过滤器获得的新rdd保存在“rdd”中,所以它不应该保留所有转换吗?如果没有,我该怎么办?
谢谢你指出我哪里错了!
解决方法
结果实际上是正确的 - 这不是 Spark 的错误。注意 lambda 函数定义为 x != i
,i
没有代入 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像
rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)
等
由于过滤器都是一样的,都会用最新的i
代替,所以每次for循环只过滤掉一项。
为避免这种情况,您可以使用偏函数来确保将 i
替换到函数中:
from functools import partial
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd = rdd.filter(partial(lambda x,i: x != i,i))
print(rdd.collect())
或者您可以使用reduce
:
from functools import reduce
rdd = spark.sparkContext.parallelize([0,2])
rdd = reduce(lambda r,i: r.filter(lambda x: x != i),range(3),rdd)
print(rdd.collect())
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。