微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

加入两个 RDD 然后按另一列分组

如何解决加入两个 RDD 然后按另一列分组

我有 2 个 RDD,第一个的格式为 Code: string,Name: string,而 rdd2 的格式为 Code: string,Year: string,Delay: float

rdd1 = [('a','name1'),('b','name2')]
rdd2 = [('a','2000',1.25),('a',2.0),'2010',-1.0)]

我想执行连接(在 code 上),以便我可以按 name 对数据进行分组,以便在 delay 上进行计数、平均值、最小值和最大值等聚合。

我尝试在执行连接后压平值,如下所示:

joined = rdd1.join(rdd2).map(lambda (keys,values): (keys,) + values)

但出现错误:缺少 1 个必需的位置参数。

我的加入结果也只显示 [('code',('name','year'))],不包括延迟值。我应该如何解决这个问题?

解决方法

这在 Python 3.x 中不起作用,因为删除了对元组参数解包 (PEP-3113) 的支持。因此,类型错误。

配对 RDD 连接作为键值工作

(a,b) 加入 (a,c) 会给你 (a,(b,c))

因此,使其工作的一种方法是:

joined = rdd1.join(rdd2.map(lambda x: (x[0],x[1:])))
joined.map(lambda x: (x[0],)+ (x[1][0],) + x[1][1]).collect()
# Output
# [('b','name2','2010',-1.0),# ('a','name1','2000',1.25),2.0)]
,

在加入之前,您需要确保 rdd2 的形式为 (key,value)。否则第二个之后的元素将被丢弃。

rdd3 = rdd1.join(rdd2.map(lambda x: (x[0],(x[1],x[2]))))

rdd3.collect()
# [('b',('name2',('2010',-1.0))),('a',('name1',('2000',1.25))),2.0)))]

如果要删除嵌套结构,可以再添加一个mapValues

rdd3 = rdd1.join(rdd2.map(lambda x: (x[0],x[2])))).mapValues(lambda x: (x[0],x[1][0],x[1][1]))

rdd3.collect()
# [('b',-1.0)),1.25)),2.0))]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。